跳到主要内容

技术设计文档

版本: v1.0 创建日期: 2026-01-13 分支: feature/points-system 作者: AXBlade Team

目录

  1. 架构设计
  2. 数据库设计
  3. API设计
  4. 核心服务实现
  5. 后台任务设计
  6. 性能优化
  7. 安全设计
  8. 测试策略

架构设计

事件驱动流程

交易积分流程

┌──────────────────────────────────────────────────────────────────────┐
│ Trade Event Flow → Points │
└──────────────────────────────────────────────────────────────────────┘

MatchingEngine

│ TradeEvent via broadcast::channel

┌──────────────────┐
│ Trade Listener │ ◄─── 已存在于 main.rs
└────────┬─────────┘

│ After persist_trade() success

┌──────────────────┐ ┌────────────────────────────────┐
│ PointsService │────▶│ 1. calculate_trading_points() │
│ │ │ - Query user's tier │
│ │ │ - Calculate: vol × tier × α │
│ │ │ - Record points_event │
│ │ │ - Update summary (atomic) │
│ │ └────────────────────────────────┘
│ │ ┌────────────────────────────────┐
│ │────▶│ 2. calculate_referral_points() │
│ │ │ - Query referral relation │
│ │ │ - Calculate referrer points │
│ │ │ - Record event for referrer │
│ │ └────────────────────────────────┘
└──────────────────┘

│ Async (non-blocking)

┌──────────────────┐
│ Update Cache │ (invalidate user_points, tier)
└──────────────────┘

持仓积分流程

┌──────────────────────────────────────────────────────────────────────┐
│ Holding Points Background Task (Hourly) │
└──────────────────────────────────────────────────────────────────────┘

Tokio Scheduler (每小时)


┌────────────────────────────┐
│ calculate_holding_points() │
└──────────┬─────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Batch Process All Active Positions │
│ 1. Query: SELECT * FROM positions WHERE status='open' │
│ 2. For each position: │
│ - value = size × mark_price │
│ - hours = (now - last_calc_time).hours() │
│ - points = value × hours × 0.00001 │
│ - Insert points_event │
│ - Update user_points_summary │
│ - Update position.last_calc_time = now │
└─────────────────────────────────────────────────────────┘

集成点

与现有系统的集成点:

现有系统模块集成方式集成点
MatchingEngine监听 TradeEvent计算交易积分 + 推荐积分
PositionService查询活跃仓位定时计算持仓积分
ReferralService查询推荐关系分配推荐积分
BlockchainService监听质押事件记录质押 + 计算质押积分
Database共享连接池所有数据持久化
CacheManager统一缓存层积分/Tier缓存

数据流向

实时数据流:
Trade → Points Event → Update Summary → Invalidate Cache → Broadcast Update

定时数据流:
Scheduler → Batch Calculate → Insert Events → Update Summaries → Refresh Cache

查询数据流:
API Request → Check Cache → Miss → Query DB → Update Cache → Return

数据库设计

表结构设计

Epoch配置表

CREATE TABLE IF NOT EXISTS points_epochs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
epoch_number INT NOT NULL UNIQUE,
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ NOT NULL,
duration_days INT NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- 状态: pending(待开始), active(进行中), ended(已结束), settled(已结算)

config JSONB,
-- 存储当期配置:
-- {
-- "tiers": [{"name": "T1", "min": 0, "max": 99999, "multiplier": 1.0}, ...],
-- "point_rates": {"trading": 0.0001, "pnl": 0.001, ...}
-- }

created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_epoch_status ON points_epochs(status);
CREATE INDEX idx_epoch_time ON points_epochs(start_time, end_time);

用户积分汇总表

CREATE TABLE IF NOT EXISTS user_points_summary (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_address VARCHAR(42) NOT NULL,
epoch_number INT NOT NULL,

-- 5种积分类型
trading_points DECIMAL(20, 2) NOT NULL DEFAULT 0,
pnl_points DECIMAL(20, 2) NOT NULL DEFAULT 0,
holding_points DECIMAL(20, 2) NOT NULL DEFAULT 0,
referral_points DECIMAL(20, 2) NOT NULL DEFAULT 0,
staking_points DECIMAL(20, 2) NOT NULL DEFAULT 0,

-- 汇总
total_points DECIMAL(20, 2) NOT NULL DEFAULT 0,

-- 交易统计
trading_volume DECIMAL(30, 6) NOT NULL DEFAULT 0,
trade_count INT NOT NULL DEFAULT 0,
realized_pnl DECIMAL(30, 6) NOT NULL DEFAULT 0,

-- Tier信息
tier VARCHAR(10), -- T1, T2, T3, T4
tier_multiplier DECIMAL(5, 2) DEFAULT 1.0,

-- 推荐统计
referral_count INT NOT NULL DEFAULT 0,
referral_volume DECIMAL(30, 6) NOT NULL DEFAULT 0,

updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

CONSTRAINT unique_user_epoch UNIQUE(user_address, epoch_number)
);

CREATE INDEX idx_user_epoch ON user_points_summary(user_address, epoch_number);
CREATE INDEX idx_epoch_total_points ON user_points_summary(epoch_number, total_points DESC);
CREATE INDEX idx_epoch_trading_points ON user_points_summary(epoch_number, trading_points DESC);
CREATE INDEX idx_epoch_tier ON user_points_summary(epoch_number, tier);

积分事件明细表 (TimescaleDB)

CREATE TABLE IF NOT EXISTS points_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_address VARCHAR(42) NOT NULL,
epoch_number INT NOT NULL,
point_type VARCHAR(20) NOT NULL,
-- 类型: trading, pnl, holding, referral, staking

points DECIMAL(20, 2) NOT NULL,

-- 关联数据
related_trade_id UUID, -- 关联交易ID
related_order_id UUID, -- 关联订单ID
related_position_id UUID, -- 关联仓位ID
referrer_address VARCHAR(42), -- 推荐人地址(仅referral类型)

-- 元数据
metadata JSONB,
-- 示例:
-- Trading: {"volume": 100000, "tier": "T2", "multiplier": 1.1}
-- PnL: {"realized_pnl": 5000, "position_size": 10}
-- Holding: {"position_value": 50000, "hours": 24}
-- Referral: {"referee": "0x...", "referee_volume": 10000}
-- Staking: {"staking_amount": 10000, "days": 1}

created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 转换为TimescaleDB hypertable (时序优化)
SELECT create_hypertable('points_events', 'created_at', if_not_exists => TRUE);

CREATE INDEX idx_user_events ON points_events(user_address, created_at DESC);
CREATE INDEX idx_epoch_events ON points_events(epoch_number, created_at);
CREATE INDEX idx_point_type ON points_events(point_type, created_at);
CREATE INDEX idx_related_trade ON points_events(related_trade_id) WHERE related_trade_id IS NOT NULL;

-- TimescaleDB 压缩策略 (7天后压缩,节省存储)
SELECT add_compression_policy('points_events', INTERVAL '7 days');

-- 数据保留策略 (90天后删除)
SELECT add_retention_policy('points_events', INTERVAL '90 days');

交易量Tier配置表

CREATE TABLE IF NOT EXISTS trading_tier_config (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tier_name VARCHAR(10) NOT NULL, -- T1, T2, T3, T4
min_volume DECIMAL(30, 6) NOT NULL,
max_volume DECIMAL(30, 6), -- NULL表示无上限
multiplier DECIMAL(5, 2) NOT NULL,
epoch_number INT, -- NULL表示全局默认
is_active BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

CONSTRAINT unique_tier_epoch UNIQUE(tier_name, epoch_number)
);

CREATE INDEX idx_tier_epoch ON trading_tier_config(epoch_number, is_active);
CREATE INDEX idx_tier_volume ON trading_tier_config(min_volume, max_volume);

-- 插入默认Tier配置
INSERT INTO trading_tier_config (tier_name, min_volume, max_volume, multiplier) VALUES
('T1', 0, 99999.99, 1.0),
('T2', 100000, 499999.99, 1.1),
('T3', 500000, 999999.99, 1.3),
('T4', 1000000, NULL, 1.5);

质押记录表

CREATE TABLE IF NOT EXISTS points_staking (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_address VARCHAR(42) NOT NULL,
amount DECIMAL(30, 6) NOT NULL,
token_address VARCHAR(42) NOT NULL, -- 质押的代币地址

start_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
end_time TIMESTAMPTZ,
status VARCHAR(20) NOT NULL DEFAULT 'active',
-- 状态: active(质押中), withdrawn(已提取)

tx_hash VARCHAR(66), -- 质押交易hash
withdraw_tx_hash VARCHAR(66), -- 提取交易hash

last_calculated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- 上次计算积分的时间(用于增量计算)

created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_user_staking ON points_staking(user_address, status);
CREATE INDEX idx_staking_status ON points_staking(status, start_time);
CREATE INDEX idx_staking_time ON points_staking(start_time, end_time);

排行榜缓存表

CREATE TABLE IF NOT EXISTS points_leaderboard (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
epoch_number INT NOT NULL,
rank_type VARCHAR(20) NOT NULL,
-- 类型: total, trading, pnl, holding, referral, staking

user_address VARCHAR(42) NOT NULL,
rank INT NOT NULL,
points DECIMAL(20, 2) NOT NULL,

-- 额外信息
username VARCHAR(100), -- 用户昵称(如果有)
tier VARCHAR(10), -- 用户Tier

updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

CONSTRAINT unique_leaderboard_entry UNIQUE(epoch_number, rank_type, user_address)
);

CREATE INDEX idx_leaderboard ON points_leaderboard(epoch_number, rank_type, rank);
CREATE INDEX idx_leaderboard_user ON points_leaderboard(user_address, epoch_number);
CREATE INDEX idx_leaderboard_updated ON points_leaderboard(updated_at);

管理员操作日志

CREATE TABLE IF NOT EXISTS points_admin_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
admin_address VARCHAR(42) NOT NULL,
action VARCHAR(50) NOT NULL,
-- 操作: adjust_points, create_epoch, update_tier_config,
-- recalculate, manual_settlement

target_user VARCHAR(42),
target_epoch INT,

details JSONB,
-- 示例:
-- {
-- "before": {"total_points": 1000},
-- "after": {"total_points": 1500},
-- "reason": "Compensation for system error"
-- }

created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_admin_logs_time ON points_admin_logs(created_at DESC);
CREATE INDEX idx_admin_logs_user ON points_admin_logs(target_user, created_at DESC);
CREATE INDEX idx_admin_logs_epoch ON points_admin_logs(target_epoch, created_at DESC);
CREATE INDEX idx_admin_logs_action ON points_admin_logs(action, created_at DESC);

TimescaleDB 连续聚合

为加速查询,创建小时级汇总视图:

-- 小时级积分汇总 (加速历史查询)
CREATE MATERIALIZED VIEW points_hourly_summary
WITH (timescaledb.continuous) AS
SELECT
user_address,
time_bucket('1 hour', created_at) AS hour,
epoch_number,
point_type,
SUM(points) AS total_points,
COUNT(*) AS event_count,
MIN(created_at) AS first_event_at,
MAX(created_at) AS last_event_at
FROM points_events
GROUP BY user_address, hour, epoch_number, point_type;

-- 自动刷新策略 (每小时刷新)
SELECT add_continuous_aggregate_policy('points_hourly_summary',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');

API设计

用户端API

获取用户积分详情

GET /api/v1/points

获取当前登录用户的积分概览、明细及统计数据。

  • Auth: Required (Header: x-user-address 或 Bearer Token)

Query Parameters:

ParameterTypeRequiredDescription
epochintNo指定赛季编号。默认返回当前活跃 Epoch。

Response (200 OK):

{
"success": true,
"data": {
"user_address": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266",
"epoch_number": 1,
"epoch_status": "active",

"trading_points": "120.50",
"pnl_points": "50.00",
"holding_points": "10.00",
"referral_points": "0.00",
"staking_points": "0.00",

"total_points": "180.50",
"tier": "T1",
"tier_multiplier": "1.0",

"rank": 42,

"trading_volume": "100000.00",
"trade_count": 5,
"referral_count": 0,

"updated_at": "2026-01-14T07:20:00Z"
},
"error": null
}

获取积分排行榜

GET /api/v1/leaderboard

获取当前 Epoch 的积分排行榜。

  • Auth: Public

Query Parameters:

ParameterTypeRequiredDescription
epochintNo赛季编号 (默认当前Epoch)
typestringNo榜单类型 (默认total,可选: trading/pnl/holding/referral/staking)
limitintNo每页数量 (默认100)

Response (200 OK):

{
"success": true,
"data": {
"epoch_number": 1,
"rank_type": "total",
"total": 50,
"updated_at": "2026-01-14T08:00:00Z",
"entries": [
{
"rank": 1,
"user_address": "0x123...abc",
"username": null,
"points": "10000.50",
"tier": "T4"
},
{
"rank": 2,
"user_address": "0x456...def",
"username": null,
"points": "8500.00",
"tier": "T3"
}
]
},
"error": null
}

获取赛季信息

GET /api/v1/epochs

获取当前系统的赛季状态信息。

  • Auth: Public

Response (200 OK):

{
"success": true,
"data": {
"current_epoch": {
"number": 1,
"status": "active",
"start_time": "2026-01-01T00:00:00Z",
"end_time": "2026-01-31T23:59:59Z",
"duration_days": 30
}
},
"error": null
}

管理员API

获取系统配置

GET /api/v1/admin/points/config

获取当前积分系统配置。

更新系统配置

PUT /api/v1/admin/points/config

  • Body:
{
"enabled": true,
"trading_enabled": true,
"cache_ttl": 60
}

触发后台任务

POST /api/v1/admin/points/trigger

手动触发后台计算或刷新任务。

  • Query Parameters:
    • task (string): holding | staking | leaderboard

获取Epoch统计

GET /api/v1/admin/points/stats/:epoch

获取指定 Epoch 的全局统计数据。

手动调整积分

POST /api/v1/admin/points/adjust

  • Body:
{
"user_address": "0x...",
"epoch_number": 1,
"point_type": "trading",
"points": "100.0",
"reason": "Compensation"
}

重新计算用户积分

POST /api/v1/admin/points/recalculate

  • Body:
{
"user_address": "0x...",
"epoch_number": 1
}

核心服务实现

PointsService 结构

// src/services/points/mod.rs

use std::sync::Arc;
use rust_decimal::Decimal;
use chrono::{DateTime, Utc};
use uuid::Uuid;
use sqlx::PgPool;
use std::collections::HashMap;

pub struct PointsService {
db: Arc<Database>,
cache: Arc<CacheManager>,
position_service: Arc<PositionService>,
referral_service: Arc<ReferralService>,
}

#[derive(Debug, Clone)]
pub struct EpochInfo {
pub epoch_number: i32,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub duration_days: i32,
pub status: EpochStatus,
}

#[derive(Debug, Clone, PartialEq)]
pub enum EpochStatus {
Pending,
Active,
Ended,
Settled,
}

#[derive(Debug, Clone)]
pub struct UserPointsSummary {
pub user_address: String,
pub epoch_number: i32,
pub trading_points: Decimal,
pub pnl_points: Decimal,
pub holding_points: Decimal,
pub referral_points: Decimal,
pub staking_points: Decimal,
pub total_points: Decimal,
pub tier: Option<String>,
pub tier_multiplier: Decimal,
pub trading_volume: Decimal,
pub trade_count: i32,
pub realized_pnl: Decimal,
}

#[derive(Debug, Clone)]
pub struct PointsEvent {
pub user_address: String,
pub epoch_number: i32,
pub point_type: PointType,
pub points: Decimal,
pub related_trade_id: Option<Uuid>,
pub related_order_id: Option<Uuid>,
pub related_position_id: Option<Uuid>,
pub referrer_address: Option<String>,
pub metadata: serde_json::Value,
}

#[derive(Debug, Clone, PartialEq)]
pub enum PointType {
Trading,
Pnl,
Holding,
Referral,
Staking,
}

#[derive(Debug, thiserror::Error)]
pub enum PointsError {
#[error("Database error: {0}")]
Database(String),

#[error("No active epoch found")]
NoActiveEpoch,

#[error("Invalid calculation: {0}")]
InvalidCalculation(String),

#[error("User not found: {0}")]
UserNotFound(String),
}

交易积分计算

impl PointsService {
/// 计算交易积分 (在交易完成时调用)
pub async fn calculate_trading_points(
&self,
trade: &TradeEvent,
) -> Result<Decimal, PointsError> {
let epoch = self.get_active_epoch().await?;
let volume = trade.price * trade.amount;

// 获取Taker的Tier倍数
let tier_multiplier = self
.get_user_tier_multiplier(&trade.taker, epoch.epoch_number)
.await?;

// 公式: 交易量 × Tier倍数 × 0.0001
let points = volume * tier_multiplier * Decimal::from_str("0.0001")
.map_err(|e| PointsError::InvalidCalculation(e.to_string()))?;

// 使用事务记录积分
let mut tx = self.db.pool.begin().await
.map_err(|e| PointsError::Database(e.to_string()))?;

// 1. 插入积分事件
sqlx::query(r#"
INSERT INTO points_events
(user_address, epoch_number, point_type, points, related_trade_id, metadata)
VALUES ($1, $2, $3, $4, $5, $6)
"#)
.bind(&trade.taker)
.bind(epoch.epoch_number)
.bind("trading")
.bind(points)
.bind(trade.trade_id)
.bind(serde_json::json!({
"volume": volume,
"tier_multiplier": tier_multiplier,
"symbol": trade.symbol,
"side": trade.side
}))
.execute(&mut *tx)
.await
.map_err(|e| PointsError::Database(e.to_string()))?;

// 2. 更新汇总表 (upsert)
sqlx::query(r#"
INSERT INTO user_points_summary
(user_address, epoch_number, trading_points, total_points,
trading_volume, trade_count, tier, tier_multiplier)
VALUES ($1, $2, $3, $3, $4, 1, $5, $6)
ON CONFLICT (user_address, epoch_number)
DO UPDATE SET
trading_points = user_points_summary.trading_points + $3,
total_points = user_points_summary.total_points + $3,
trading_volume = user_points_summary.trading_volume + $4,
trade_count = user_points_summary.trade_count + 1,
updated_at = NOW()
"#)
.bind(&trade.taker)
.bind(epoch.epoch_number)
.bind(points)
.bind(volume)
.bind(&self.calculate_tier(volume).await?.name)
.bind(tier_multiplier)
.execute(&mut *tx)
.await
.map_err(|e| PointsError::Database(e.to_string()))?;

tx.commit().await
.map_err(|e| PointsError::Database(e.to_string()))?;

// 3. 使缓存失效
self.invalidate_user_cache(&trade.taker, epoch.epoch_number).await;

tracing::info!(
"Calculated trading points for {}: {} points (volume: {}, tier_multiplier: {})",
trade.taker, points, volume, tier_multiplier
);

Ok(points)
}

/// 获取用户的Tier倍数
async fn get_user_tier_multiplier(
&self,
user_address: &str,
epoch_number: i32,
) -> Result<Decimal, PointsError> {
// 先查缓存
let cache_key = format!("tier:{}:{}", user_address, epoch_number);
if let Ok(Some(cached)) = self.cache.get::<Decimal>(&cache_key).await {
return Ok(cached);
}

// 查询用户当前Epoch的交易量
let volume: Option<Decimal> = sqlx::query_scalar(r#"
SELECT COALESCE(trading_volume, 0)
FROM user_points_summary
WHERE user_address = $1 AND epoch_number = $2
"#)
.bind(user_address)
.bind(epoch_number)
.fetch_optional(&self.db.pool)
.await
.map_err(|e| PointsError::Database(e.to_string()))?;

let volume = volume.unwrap_or(Decimal::ZERO);

// 查询Tier配置
let tier = self.calculate_tier(volume).await?;

// 缓存60秒
let _ = self.cache.set_ex(&cache_key, &tier.multiplier, 60).await;

Ok(tier.multiplier)
}
}

PnL积分计算

impl PointsService {
/// 计算PnL积分 (在平仓时调用)
pub async fn calculate_pnl_points(
&self,
user_address: &str,
position_id: Uuid,
realized_pnl: Decimal,
) -> Result<Decimal, PointsError> {
// 只有盈利才计入积分
if realized_pnl <= Decimal::ZERO {
return Ok(Decimal::ZERO);
}

let epoch = self.get_active_epoch().await?;

// 公式: 已实现盈利 × 0.001
let points = realized_pnl * Decimal::from_str("0.001")
.map_err(|e| PointsError::InvalidCalculation(e.to_string()))?;

let mut tx = self.db.pool.begin().await
.map_err(|e| PointsError::Database(e.to_string()))?;

// 1. 插入积分事件
sqlx::query(r#"
INSERT INTO points_events
(user_address, epoch_number, point_type, points, related_position_id, metadata)
VALUES ($1, $2, $3, $4, $5, $6)
"#)
.bind(user_address)
.bind(epoch.epoch_number)
.bind("pnl")
.bind(points)
.bind(position_id)
.bind(serde_json::json!({
"realized_pnl": realized_pnl
}))
.execute(&mut *tx)
.await
.map_err(|e| PointsError::Database(e.to_string()))?;

// 2. 更新汇总表
sqlx::query(r#"
INSERT INTO user_points_summary
(user_address, epoch_number, pnl_points, total_points, realized_pnl)
VALUES ($1, $2, $3, $3, $4)
ON CONFLICT (user_address, epoch_number)
DO UPDATE SET
pnl_points = user_points_summary.pnl_points + $3,
total_points = user_points_summary.total_points + $3,
realized_pnl = user_points_summary.realized_pnl + $4,
updated_at = NOW()
"#)
.bind(user_address)
.bind(epoch.epoch_number)
.bind(points)
.bind(realized_pnl)
.execute(&mut *tx)
.await
.map_err(|e| PointsError::Database(e.to_string()))?;

tx.commit().await
.map_err(|e| PointsError::Database(e.to_string()))?;

self.invalidate_user_cache(user_address, epoch.epoch_number).await;

Ok(points)
}
}

后台任务设计

任务调度器

// src/app/workers.rs

use tokio::time::{interval, Duration};
use std::sync::Arc;

pub async fn start_points_workers(
points_service: Arc<PointsService>,
) {
// 1. 持仓积分计算任务 (每小时)
let points_service_clone = Arc::clone(&points_service);
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(3600)); // 1 hour
loop {
interval.tick().await;

tracing::info!("Starting holding points calculation task");
match points_service_clone.calculate_holding_points_batch().await {
Ok(_) => tracing::info!("Holding points calculation completed"),
Err(e) => tracing::error!("Holding points calculation failed: {}", e),
}
}
});

// 2. 排行榜更新任务 (每5分钟)
let points_service_clone = Arc::clone(&points_service);
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(300)); // 5 minutes
loop {
interval.tick().await;

tracing::debug!("Starting leaderboard refresh task");
match points_service_clone.refresh_leaderboard(None).await {
Ok(_) => tracing::debug!("Leaderboard refresh completed"),
Err(e) => tracing::error!("Leaderboard refresh failed: {}", e),
}
}
});

// 3. Epoch状态检查任务 (每分钟)
let points_service_clone = Arc::clone(&points_service);
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(60)); // 1 minute
loop {
interval.tick().await;

match points_service_clone.check_epoch_transitions().await {
Ok(changed) => {
if changed {
tracing::info!("Epoch status changed");
}
}
Err(e) => tracing::error!("Epoch status check failed: {}", e),
}
}
});

tracing::info!("All points system workers started successfully");
}

性能优化

Redis缓存策略

impl PointsService {
/// 获取用户积分 (带缓存)
pub async fn get_user_points_cached(
&self,
user_address: &str,
epoch_number: i32,
) -> Result<UserPointsSummary, PointsError> {
let cache_key = format!("points:user:{}:epoch:{}", user_address, epoch_number);

// 1. 尝试从缓存读取
if let Ok(Some(cached)) = self.cache.get::<UserPointsSummary>(&cache_key).await {
return Ok(cached);
}

// 2. 缓存未命中,查询数据库
let summary = self.get_user_points_from_db(user_address, epoch_number).await?;

// 3. 写入缓存 (TTL: 60秒)
let _ = self.cache.set_ex(&cache_key, &summary, 60).await;

Ok(summary)
}

/// 使缓存失效
pub async fn invalidate_user_cache(&self, user_address: &str, epoch_number: i32) {
let keys = vec![
format!("points:user:{}:epoch:{}", user_address, epoch_number),
format!("tier:{}:{}", user_address, epoch_number),
];

for key in keys {
let _ = self.cache.del(&key).await;
}
}
}

数据库索引优化

-- user_points_summary 核心查询索引
CREATE INDEX idx_user_epoch ON user_points_summary(user_address, epoch_number);
CREATE INDEX idx_epoch_total_points ON user_points_summary(epoch_number, total_points DESC);

-- points_events 时序查询索引
CREATE INDEX idx_user_events ON points_events(user_address, created_at DESC);
CREATE INDEX idx_epoch_events ON points_events(epoch_number, created_at);

-- leaderboard 排行榜查询索引
CREATE INDEX idx_leaderboard ON points_leaderboard(epoch_number, rank_type, rank);

-- 定期维护
VACUUM ANALYZE user_points_summary;
VACUUM ANALYZE points_events;

安全设计

权限控制

pub async fn admin_only_middleware(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
req: Request,
next: Next,
) -> Result<Response, (StatusCode, Json<ApiResponse<()>>)> {
// 1. 验证JWT
let token = headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
.ok_or_else(|| {
(
StatusCode::UNAUTHORIZED,
Json(ApiResponse::<()>::error("Missing authorization")),
)
})?;

let user_address = validate_jwt(token, &state.config.jwt_secret)
.map_err(|_| {
(
StatusCode::UNAUTHORIZED,
Json(ApiResponse::<()>::error("Invalid token")),
)
})?;

// 2. 验证Admin API Key
let api_key = headers
.get("x-admin-api-key")
.and_then(|v| v.to_str().ok())
.ok_or_else(|| {
(
StatusCode::FORBIDDEN,
Json(ApiResponse::<()>::error("Admin API key required")),
)
})?;

if api_key != state.config.admin_api_key {
return Err((
StatusCode::FORBIDDEN,
Json(ApiResponse::<()>::error("Invalid admin key")),
));
}

Ok(next.run(req).await)
}

防止积分作弊

impl PointsService {
/// 检测异常交易模式
pub async fn detect_wash_trading(
&self,
user_address: &str,
epoch_number: i32,
) -> Result<bool, PointsError> {
// 检测短时间内大量来回交易
let recent_trades: Vec<(String, String, i64)> = sqlx::query_as(r#"
SELECT symbol, side, COUNT(*) as count
FROM points_events
WHERE user_address = $1
AND epoch_number = $2
AND point_type = 'trading'
AND created_at > NOW() - INTERVAL '1 hour'
GROUP BY symbol, side
"#)
.bind(user_address)
.bind(epoch_number)
.fetch_all(&self.db.pool)
.await
.map_err(|e| PointsError::Database(e.to_string()))?;

for (symbol, _, count) in recent_trades {
if count > 100 {
tracing::warn!(
"Suspicious trading pattern: {} made {} trades on {} in 1 hour",
user_address, count, symbol
);
return Ok(true);
}
}

Ok(false)
}
}

测试策略

单元测试

#[cfg(test)]
mod tests {
use super::*;
use rust_decimal_macros::dec;

#[tokio::test]
async fn test_trading_points_calculation() {
let service = create_test_service().await;

let trade = TradeEvent {
trade_id: Uuid::new_v4(),
symbol: "BTCUSDT".to_string(),
price: dec!(100000),
amount: dec!(1.0),
taker: "0xtest".to_string(),
};

let points = service.calculate_trading_points(&trade).await.unwrap();

// 100000 * 1.1 * 0.0001 = 11
assert_eq!(points, dec!(11.0));
}

#[tokio::test]
async fn test_pnl_points_negative() {
let service = create_test_service().await;

// 亏损不产生积分
let points = service
.calculate_pnl_points("0xtest", Uuid::new_v4(), dec!(-1000))
.await
.unwrap();

assert_eq!(points, dec!(0));
}
}

集成测试

#[tokio::test]
async fn test_full_trading_flow() {
let test_env = setup_test_environment().await;

let user = "0xtest_user";
let epoch = create_test_epoch(1, 30).await;

let trade = TradeEvent {
taker: user.to_string(),
price: dec!(100000),
amount: dec!(1.0),
};

test_env.points_service
.calculate_trading_points(&trade)
.await
.unwrap();

let summary = test_env.points_service
.get_user_points(user, epoch.epoch_number)
.await
.unwrap();

assert_eq!(summary.trading_points, dec!(10.0));
assert_eq!(summary.total_points, dec!(10.0));
}

技术栈

组件技术版本
编程语言Rust1.75+
Web框架Axum0.7
数据库PostgreSQL15+
时序数据库TimescaleDB2.13+
缓存Redis7.0+
ORMSQLx0.7
异步运行时Tokio1.x

配置示例

# .env 新增配置项

# Points System
POINTS_HOLDING_CALC_INTERVAL_SECS=3600 # 持仓积分计算间隔(秒)
POINTS_STAKING_CALC_INTERVAL_SECS=86400 # 质押积分计算间隔(秒)
POINTS_LEADERBOARD_REFRESH_SECS=300 # 排行榜刷新间隔(秒)
POINTS_CACHE_TTL_SECS=60 # 积分缓存TTL(秒)
POINTS_ENABLE_ANTI_CHEAT=true # 启用反作弊检测
POINTS_ADMIN_AUDIT_LOG=true # 启用管理员操作日志