跳到主要内容

servers_v2 差距补全与优化 — 实施计划(历史归档快照)

Status

Superseded

Date

2026-03-23

Superseded By

  • docs/architecture/migration-readiness.md
  • docs/architecture/service-catalog.md
  • docs/architecture/http-entrypoints.md
  • docs/architecture/deployment-topology.md
  • docs/services/
  • docs/plans/recon/2026-04-22-recon-service-implementation.md
  • docs/plans/worker-health/2026-04-22-worker-health-and-settlement-hardening.md

Important Note

This file is retained as a historical execution snapshot only.

It predates:

  • the committed servers_v2/docker-compose.yml stack
  • the documented recon_service extraction
  • the worker-health hardening work
  • the current docs/architecture/* and docs/services/* source-of-truth set

Do not execute this file as a current plan. Use the superseding documents above instead.

Historical Content

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 补全 servers_v2 相比 servers 的所有确认缺失功能,并完成架构优化,使 v2 达到可部署、功能对等、架构更优的状态。

Architecture: servers_v2 是基于 DDD 的微服务架构,8 个服务共享 PostgreSQL + Redis。gateway 处理玩家端流量,admin/agent 直连。所有资金操作通过 wallet_service(单一资金写入者)。服务间通过 outbox/inbox 模式的 Redis Streams 异步通信。

Tech Stack: Python 3.13+, FastAPI, SQLAlchemy 2.0 (asyncpg), Redis, Docker Compose, uv (package manager), pytest

Historical Spec: docs/reference/history/migrations/2026-03-23-servers-v2-gap-analysis-design.md

HTTP 客户端标准化说明: v2 中 admin_service 和 game_service 使用 aiohttp.ClientSession(带熔断器),promotion_service 和 rolling_service 使用裸 httpx(无熔断器)。本计划统一标准化为 aiohttp:Task 2(新 PlisioClient 使用 aiohttp)和 Task 9(promotion/rolling client 从 httpx 迁移到 aiohttp + 熔断器)共同完成此标准化。

已知不在范围内的差距: SMS 调用模式迁移(从外部 sms_url HTTP 调用切换为纯 SmsService 库模式)被有意延后,不在本计划的 17 个任务中。


Phase 1: 部署就绪(高优先级)

Task 1: 创建 v2 Docker Compose

Files:

  • Create: servers_v2/docker-compose.yml
  • Create: servers_v2/Dockerfile (共用模板)

Historical Context: 当时仓库尚未提交当前版本的 Compose 本地栈,因此这里记录了早期补全步骤。

端口规范(见 servers_v2/CLAUDE.md):

  • gateway: 8080
  • player_service: 8010
  • wallet_service: 8011
  • game_service: 8012
  • rolling_service: 8013
  • promotion_service: 8014
  • agent_service: 8015
  • admin_service: 8016

每个服务的 dev.env 已有环境变量模板。pyproject.toml 中的 uv.sources 引用了本地共享包 rgb-dbrgb-contracts(路径 ../../servers/shared/)。

  • Step 1: 创建共用 Dockerfile

注意: Docker build context 必须设为仓库根目录(不是 servers_v2/),因为 Dockerfile 需要 COPY servers/shared/ 目录(共享包)。docker-compose.yml 中 context: . 应指向仓库根。

# servers_v2/Dockerfile
FROM python:3.13-slim@sha256:a0779d7c12fc20be6ec6b4ddc901a4fd7657b8a6bc9def9d3fde89ed5efe0a3d

WORKDIR /app

# Install uv
COPY --from=ghcr.io/astral-sh/uv:0.10.6@sha256:2f2ccd27bbf953ec7a9e3153a4563705e41c852a5e1912b438fc44d88d6cb52c /uv /usr/local/bin/uv

# Copy shared packages first (for caching)
COPY servers/shared/rgb_db /shared/rgb_db
COPY servers/shared/contracts /shared/contracts

# Copy service code
ARG SERVICE_DIR
COPY servers_v2/${SERVICE_DIR} /app

# Override uv.sources paths for container layout
# uv needs the shared packages accessible
RUN uv sync --frozen 2>/dev/null || uv sync

EXPOSE 8000
CMD ["uv", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
  • Step 2: 创建 docker-compose.yml

参考 servers_v2/CLAUDE.md 端口规范和各服务 dev.env 中的环境变量。关键点:

  • 所有服务共享同一 PostgreSQL 和 Redis

  • gateway 的下游 URL 指向 docker-compose 内部服务名

  • admin_service 和 agent_service 不经 gateway(独立端口映射)

  • 每个服务需要 DATABASE_URLREDIS_URL,部分需要 WALLET_SERVICE_URL

  • 共享包通过 volume mount 挂载到容器内

  • Step 3: 测试 docker-compose up

cd servers_v2
docker-compose up -d
docker-compose ps # 确认所有 8 个服务 + postgres + redis 正常
docker-compose logs gateway --tail=20 # 检查无错误
curl http://localhost:8080/health # gateway 健康检查
  • Step 4: Commit
git add servers_v2/docker-compose.yml servers_v2/Dockerfile
git commit -m "feat: add docker-compose for servers_v2 with all 8 services"

Task 2: 恢复 Crypto Deposit 地址分配

Files:

  • Create: servers_v2/wallet_service/app/services/plisio_client.py
  • Modify: servers_v2/wallet_service/app/api/routes/queries.py:460-520
  • Modify: servers_v2/wallet_service/app/core/events.py (init plisio client)
  • Test: servers_v2/wallet_service/tests/test_plisio_client.py

Context: v1 wallet_server/app/services/plisio.py 有完整的 PlisioService 类,在 deposit/gate/create 时调用 create_order_by_coin() 拿到 wallet_hash。v2 的 queries.py:514 返回空字符串占位。需要将 v1 的 Plisio API 调用逻辑迁移到 v2。

v1 PlisioService 关键方法:

  • get_exchange_rates(fiat_currency="KRW") → GET /currencies/{fiat}
  • create_order_by_coin(cid, amount, order_number, expire_min) → GET /invoices/new
  • get_details(order_id) → GET /invoices/{order_id}

v2 已有 PLISIO_SECRET 设置(app/core/settings/app.py:25),callback/agree 也已实现(plisio.py:207/394)。

  • Step 1: 编写 PlisioClient 测试

创建 servers_v2/wallet_service/tests/test_plisio_client.py,测试纯函数部分(URL 构建、响应解析)。参考 v1 servers/wallet_server/app/services/plisio.py 的接口。

  • Step 2: 运行测试确认失败
cd servers_v2/wallet_service
uv run pytest tests/test_plisio_client.py -v
  • Step 3: 实现 PlisioClient

创建 servers_v2/wallet_service/app/services/plisio_client.py,从 v1 plisio.py 迁移以下方法:

  • __init__(api_key, base_url="https://plisio.net/api/v1")

  • async create_order_by_coin(cid, amount, order_number, expire_min) — 调用 /invoices/new,然后 get_details() 获取 wallet_hash

  • async get_details(order_id) — 调用 /invoices/{order_id}

  • 使用 aiohttp 而非 httpx(与 v2 其他客户端一致)

  • Step 4: 运行测试确认通过

cd servers_v2/wallet_service
uv run pytest tests/test_plisio_client.py -v
  • Step 5: 修改 deposit/gate/create 流程

修改 servers_v2/wallet_service/app/api/routes/queries.py 中的 gate_create() 函数(约 line 460-520):

  • 在创建 coin_transaction 记录之前,调用 plisio_client.create_order_by_coin()
  • 用返回的 wallet_hashpayment_url 回填 wallet_addressweb_url
  • 如果 Plisio 调用失败,将 deposit 状态设为 expired 并返回错误

参考 v1 deposit.py:659-747 的流程。

  • Step 6: 在 events.py 中初始化 PlisioClient

app/core/events.py 的 startup handler 中创建 PlisioClient 实例并挂到 app.state.plisio_client

  • Step 7: Commit
git add servers_v2/wallet_service/app/services/plisio_client.py \
servers_v2/wallet_service/app/api/routes/queries.py \
servers_v2/wallet_service/app/core/events.py \
servers_v2/wallet_service/tests/test_plisio_client.py
git commit -m "feat: restore Plisio API integration for crypto deposit address allocation"

Task 3: 确认并实现 SBO 下游路由

Files:

  • Create: servers_v2/game_service/app/api/routes/sbo_callback.py(如 SBO 仍在使用)
  • Modify: servers_v2/game_service/app/api/routes/api.py:18

Context: gateway provider_routes.py:106 代理 /sbo/* 到 game_service,但 api.py:18 未注册 /sbo 路由。v1 也没有独立的 sbo_callback.py。需要先确认 SBO 是否仍在使用。

  • Step 1: 检查 SBO 是否在使用

搜索数据库和配置:

# 检查 provider 表是否有 SBO 记录
cd servers_v2/admin_service
grep -r "sbo\|SBO" app/ --include="*.py" | head -20

# 检查 v1 中 SBO 的实际使用情况
grep -r "sbo\|SBO" /Users/saber/Dev/RGB/servers/game_server/app/ --include="*.py" | head -20
  • Step 2: 如果 SBO 在使用,创建 sbo_callback.py

参考最相近的现有回调文件(如 bti_callback.py),创建 sbo_callback.py。SBO 的回调格式通常与 BTI 类似(validatetoken, reserve, debitreserve 等)。

  • Step 3: 注册路由到 api.py

servers_v2/game_service/app/api/routes/api.py 中添加:

from app.api.routes import sbo_callback
router.include_router(sbo_callback.router, tags=["SBO Callback"], prefix="/sbo")
  • Step 4: 如果 SBO 不在使用,移除 gateway 入口

servers_v2/gateway/app/api/routes/provider_routes.py:106 移除 /sbo/* 代理规则。

  • Step 5: Commit
git add servers_v2/game_service/app/api/routes/sbo_callback.py \
servers_v2/game_service/app/api/routes/api.py
git commit -m "feat: add SBO callback route to game_service"
# 或
git commit -m "fix: remove unused SBO gateway route"

Task 4: 清理 TRON 占位代码

Files:

  • Modify: servers_v2/admin_service/app/tasks/tron_tasks.py (删除或清空)
  • Modify: servers_v2/admin_service/app/core/events.py:68,98-100 (移除调度)
  • Modify: servers_v2/admin_service/app/api/routes/tron.py (添加弃用注释)

Context: TRON 已被 Plisio 替代(servers_v2/CLAUDE.md:110-111)。tron_tasks.py 有 3 个 placeholder 函数(循环体为 pass),每 30 秒/1 小时/5 分钟白跑。tron.py 路由有 17 个端点但都是占位实现(随机 hash 代替真实地址)。

  • Step 1: 移除 TRON 定时任务调度

修改 servers_v2/admin_service/app/core/events.py,移除 monitor_depositscollect_fundscheck_energy 三个任务的 APScheduler 注册。

  • Step 2: 清空 tron_tasks.py

将文件内容替换为弃用说明:

"""TRON tasks — DEPRECATED.

TRON integration has been replaced by Plisio for all crypto deposit/withdrawal
operations. See servers_v2/CLAUDE.md for details.

These placeholder tasks have been removed to avoid unnecessary background work.
"""
  • Step 3: 在 tron.py 路由顶部添加弃用说明

不删除路由(admin 面板可能仍在引用),但在文件顶部添加明确的弃用注释说明地址生成用的是随机 hash 而非真实 tronpy 派生。

  • Step 4: 更新 admin_service CLAUDE.md

将 TRON 相关任务标记从 "PLACEHOLDER" 改为 "REMOVED — replaced by Plisio"。

  • Step 5: Commit
git add servers_v2/admin_service/app/tasks/tron_tasks.py \
servers_v2/admin_service/app/core/events.py \
servers_v2/admin_service/app/api/routes/tron.py \
servers_v2/admin_service/CLAUDE.md
git commit -m "chore: remove TRON placeholder tasks, mark routes as deprecated"

Phase 2: 功能补全(中优先级)

Task 5: 补全玩家等级域名操作

Files:

  • Modify: servers_v2/admin_service/app/api/routes/player.py

Context: v1 有 player_level/domain.py 提供 POST /domain/addPOST /domain/set_primary。v2 的 player.py 已有其他玩家管理端点但缺少域名操作。

v1 逻辑(源文件:servers/admin_server/app/api/routes/player_level/domain.py,仓库:servers/admin_server/app/db/repositories/player_level_domain.py):

  • domain/add (domain.py:45-104): 验证域名格式 → 检查唯一性 → 插入 player_level_domain 表 → 如果是第一条则设为 primary

  • domain/set_primary (domain.py:161-210): 清除该 level 下所有 is_primary=0 → 设置目标域名 is_primary=1

  • Step 1: 在 player.py 末尾添加域名端点

添加以下端点(使用 v2 的 text SQL 风格,参考 player.py 中已有的 USDT 地址端点模式):

  • POST /players/level-domains — 添加等级域名

  • POST /players/level-domains/set-primary — 设置主域名

  • GET /players/level-domains — 列出等级域名

  • Step 2: 测试端点

cd servers_v2/admin_service
uv run python -c "from app.main import app; print('OK')" # 验证导入无错
  • Step 3: Commit
git add servers_v2/admin_service/app/api/routes/player.py
git commit -m "feat: add player level domain endpoints (add, set-primary, list)"

Task 6: 补全玩家注册审核

Files:

  • Modify: servers_v2/admin_service/app/api/routes/player.py

Context: v1 有 POST /player-registrations/{id}/actions/audit,审核通过时:更新 register_status → 发放优惠券 → 打标签 → 发消息。

  • Step 1: 在 player.py 添加审核端点

POST /players/registrations/{registration_id}/audit

  • 接收 {status, reason, operator}

  • 更新 player.register_status

  • 如果通过 (status=1),执行以下副作用(参考 v1 servers/admin_server/app/api/routes/player_registrations.py:33-89):

    • 发放注册优惠券(INSERT coupon_log)
    • 给玩家打标签(INSERT player_tag_rel)
    • 增加分组计数(UPDATE player_group SET cnt = cnt + 1)
    • 发送站内消息
  • 记录操作日志(INSERT admin_log)

  • Step 2: 测试导入

cd servers_v2/admin_service
uv run python -c "from app.main import app; print('OK')"
  • Step 3: Commit
git add servers_v2/admin_service/app/api/routes/player.py
git commit -m "feat: add player registration audit endpoint"

Task 7: 优惠券 Excel 批量导入/导出(如业务需要)

Files:

  • Modify: servers_v2/admin_service/app/api/routes/promotion.py

Context: v2 已有完整的优惠券发送功能(promotion.py:763),但可能缺少 Excel 批量导入(从 Excel 读取账号列表批量发送优惠券)和导出(将发送记录导出为 Excel)。spec 将此标为"待确认"。

  • Step 1: 确认 v1 是否有 Excel 功能
grep -r "excel\|xlsx\|openpyxl\|pandas.*read_excel\|to_excel" \
/Users/saber/Dev/RGB/servers/admin_server/app/ --include="*.py" | head -10
  • Step 2: 确认 admin 面板是否使用 Excel 导入
grep -r "excel\|xlsx\|upload.*file\|import.*coupon" \
/Users/saber/Dev/RGB/bo/admin/src/ --include="*.vue" --include="*.js" --include="*.ts" | head -10
  • Step 3: 如需要,在 promotion.py 添加 Excel 端点

  • POST /coupon/send/excel — 接受 multipart/form-data 上传 Excel 文件,解析账号列表,批量发送优惠券

  • GET /coupon/admin/logs/export — 将发送记录导出为 Excel

使用 pandasopenpyxl(admin_service 已依赖 pandas)。

  • Step 4: Commit
git add servers_v2/admin_service/app/api/routes/promotion.py
git commit -m "feat: add coupon Excel batch import/export endpoints"

Task 8: 实现 Agent 密码重置(原 Task 7)

Files:

  • Modify: servers_v2/agent_service/app/api/routes/auth.py:207-210

Context: 当前 POST /agent/forget 返回 501。v1 的 user.py 有密码重置流程(发 SMS → 验证 → 更新密码)。v2 agent_service 已有 send_sms(sms_url, ...) 调用模式(auth.py:369)。

  • Step 1: 实现 forget 端点

替换 auth.py:207-210 的 501 响应,参考 v1 逻辑和 v2 中已有的 edit/phone/send-sms 模式:

  • 根据 agent account 查找记录
  • 调用 send_sms() 发送验证码
  • 前端后续调用 verify + reset 端点(如果 v2 有的话)

注意:如果 v2 只需要实现发送验证码部分(前端再调 edit/password),那实现范围更小。

  • Step 2: 测试导入
cd servers_v2/agent_service
uv run python -c "from app.main import app; print('OK')"
  • Step 3: Commit
git add servers_v2/agent_service/app/api/routes/auth.py
git commit -m "feat: implement agent password reset (POST /forget)"

Task 9: 补全游戏同步端点(如仍需要)

Files:

  • Modify: servers_v2/game_service/app/api/routes/integration.py

Context: v1 有 POST /games_sync/wcPOST /games_sync/mgPOST /wc/transaction_detailsPOST /mg/transaction_detailsGET /mg_games。这些是管理/维护工具。

  • Step 1: 确认是否仍需要

检查 admin 面板是否调用这些端点:

grep -r "games_sync\|transaction_details\|mg_games" /Users/saber/Dev/RGB/bo/ --include="*.js" --include="*.vue" --include="*.ts" | head -10
  • Step 2: 如需要,在 integration.py 中添加端点

从 v1 servers/game_server/app/api/routes/integration.py 迁移对应逻辑。主要是调用供应商 API 同步游戏列表到 game 表。

  • Step 3: Commit
git add servers_v2/game_service/app/api/routes/integration.py
git commit -m "feat: add game sync endpoints for WC and MG providers"

Phase 3: 架构优化(中优先级)

Task 10: 熔断器标准化

Files:

  • Create: servers/shared/contracts/src/rgb_contracts/infra/circuit_breaker.py
  • Modify: servers_v2/promotion_service/app/services/wallet_client.py
  • Modify: servers_v2/rolling_service/app/services/wallet_client.py
  • Modify: servers_v2/promotion_service/app/services/rolling_client.py
  • Test: servers/shared/contracts/tests/test_circuit_breaker.py

Context: admin_servicegame_service 的 wallet_client 有完整的 _CircuitBreaker 类(threshold=5, recovery=10s, time.monotonic())。promotion_servicerolling_service 的 client 是裸 httpx 薄封装,无熔断/重试/连接池。

目标:将 _CircuitBreaker 提取到 rgb_contracts.infra.circuit_breaker,然后在 promotion/rolling 的 client 中引入。

  • Step 1: 编写 CircuitBreaker 测试

创建 servers/shared/contracts/tests/test_circuit_breaker.py

import time
from rgb_contracts.infra.circuit_breaker import CircuitBreaker

def test_initially_closed():
cb = CircuitBreaker(threshold=3, recovery_s=1.0)
assert not cb.is_open

def test_opens_after_threshold_failures():
cb = CircuitBreaker(threshold=3, recovery_s=1.0)
cb.failure()
cb.failure()
assert not cb.is_open
cb.failure()
assert cb.is_open

def test_resets_on_success():
cb = CircuitBreaker(threshold=2, recovery_s=1.0)
cb.failure()
cb.success()
cb.failure()
assert not cb.is_open # reset by success

def test_half_open_after_recovery():
cb = CircuitBreaker(threshold=1, recovery_s=0.1)
cb.failure()
assert cb.is_open
time.sleep(0.15)
assert not cb.is_open # recovery window passed
  • Step 2: 运行测试确认失败
cd servers/shared/contracts
uv run pytest tests/test_circuit_breaker.py -v
  • Step 3: 实现 CircuitBreaker 模块

创建 servers/shared/contracts/src/rgb_contracts/infra/circuit_breaker.py

admin_service/app/services/wallet_client.py:35-57 提取 _CircuitBreaker 类,重命名为 CircuitBreaker(公开类),保持相同逻辑。

  • Step 4: 运行测试确认通过
cd servers/shared/contracts
uv run pytest tests/test_circuit_breaker.py -v
  • Step 5: 升级 promotion_service wallet_client

修改 servers_v2/promotion_service/app/services/wallet_client.py

  • 替换裸 httpx 为 aiohttp session

  • 引入 from rgb_contracts.infra.circuit_breaker import CircuitBreaker

  • 添加重试逻辑(exponential backoff, 3 次)

  • 保持现有方法签名不变

  • Step 6: 升级 rolling_service wallet_client

同样修改 servers_v2/rolling_service/app/services/wallet_client.py

  • Step 7: 升级 promotion_service rolling_client

同样修改 servers_v2/promotion_service/app/services/rolling_client.py

  • Step 8: 运行各服务现有测试确认无回归
cd servers_v2/promotion_service && uv run pytest tests/ -v
cd servers_v2/rolling_service && uv run pytest tests/ -v
  • Step 9: Commit
git add servers/shared/contracts/src/rgb_contracts/infra/circuit_breaker.py \
servers/shared/contracts/tests/test_circuit_breaker.py \
servers_v2/promotion_service/app/services/wallet_client.py \
servers_v2/rolling_service/app/services/wallet_client.py \
servers_v2/promotion_service/app/services/rolling_client.py
git commit -m "feat: extract CircuitBreaker to shared contracts, apply to promotion/rolling clients"

Task 11: 增加健康检查和可观测性

Files:

  • Create: servers/shared/contracts/src/rgb_contracts/infra/health.py
  • Create: servers/shared/contracts/src/rgb_contracts/infra/request_id.py
  • Modify: 6 个服务的 app/api/routes/api.py(添加 /health)
  • Modify: servers_v2/gateway/app/services/proxy.py(传播 X-Request-ID)
  • Modify: servers_v2/gateway/app/middleware/auth.py(生成 X-Request-ID)

Context: gateway 和 admin_service 已有 /health。其他 6 个服务没有。admin_service 的 /health 检查 DB + Redis + wallet client + scheduler。

  • Step 1: 创建共享健康检查辅助模块

servers/shared/contracts/src/rgb_contracts/infra/health.py

async def check_db(pool) -> dict:
try:
async with pool() as session:
await session.execute(text("SELECT 1"))
return {"status": "ok"}
except Exception as e:
return {"status": "error", "error": str(e)}

async def check_redis(redis) -> dict:
try:
await redis.ping()
return {"status": "ok"}
except Exception as e:
return {"status": "error", "error": str(e)}
  • Step 2: 为 6 个服务添加 /health 端点

wallet_serviceplayer_servicegame_servicerolling_servicepromotion_serviceagent_serviceapi.py 中添加 /health 端点,调用共享辅助模块。

  • Step 3: 在 gateway 添加 X-Request-ID 生成和传播

修改 gateway/app/middleware/auth.py:在请求进入时生成 X-Request-ID(如果 header 中没有的话),写入 request.state.request_id

修改 gateway/app/services/proxy.py:在转发 header 列表中添加 x-request-id

  • Step 4: Commit
git add servers/shared/contracts/src/rgb_contracts/infra/health.py \
servers_v2/*/app/api/routes/api.py \
servers_v2/gateway/app/services/proxy.py \
servers_v2/gateway/app/middleware/auth.py
git commit -m "feat: add /health endpoints to all services, propagate X-Request-ID"

Task 12: 将 admin_service 后台任务提取为独立 worker

Files:

  • Create: servers_v2/admin_service/worker.py
  • Modify: servers_v2/admin_service/app/core/events.py
  • Modify: servers_v2/docker-compose.yml

Context: admin_service 的 events.py 注册了 12+ 个 APScheduler 任务(vera sync、stat aggregation、tag、level、attendance、expire、tron)。这使得 admin_service 同时是 API 服务器和任务调度器。

目标:将 APScheduler 任务提取到独立的 worker 进程,admin_service API 不再运行定时任务。

  • Step 1: 创建 worker.py 入口

创建 servers_v2/admin_service/worker.py

  • 加载 Settings(同 app 使用的 settings)

  • 初始化 DB 连接池(build_runtime())和 Redis 连接

  • 初始化 WalletClient 和 RollingClient(部分任务如 stat.pytag.py 依赖 app.state.wallet_client

  • 将上述资源挂到一个伪 app.state 对象上(或创建简单的 WorkerContext dataclass)

  • 注册所有 APScheduler 定时任务

  • asyncio.run() 启动事件循环

  • 不启动 FastAPI/uvicorn

  • Step 2: 修改 events.py 支持可选调度

添加环境变量 ENABLE_SCHEDULER=true/false,默认 true(向后兼容)。当 false 时跳过 APScheduler 初始化。

  • Step 3: 在 docker-compose 中添加 worker 服务
admin_worker:
build:
context: .
dockerfile: Dockerfile
args:
SERVICE_DIR: admin_service
command: uv run python worker.py
environment:
ENABLE_SCHEDULER: "true"
depends_on:
- postgres
- redis

admin_service API 服务设置 ENABLE_SCHEDULER=false

  • Step 4: Commit
git add servers_v2/admin_service/worker.py \
servers_v2/admin_service/app/core/events.py \
servers_v2/docker-compose.yml
git commit -m "feat: extract admin scheduled tasks to standalone worker process"

Task 13: 添加 Cloudflare R2 文件上传(如需要)

Files:

  • Create: servers/shared/contracts/src/rgb_contracts/infra/storage.py
  • Modify: servers_v2/admin_service/app/core/settings/app.py

Context: v1 有 admin_server/services/cf_r2.py。如果 admin 面板需要上传文件(banner 图片等),需要实现。

  • Step 1: 确认是否需要
grep -r "cf_r2\|r2\|upload\|file.*upload" /Users/saber/Dev/RGB/bo/admin/src/ --include="*.vue" --include="*.js" --include="*.ts" | head -10
  • Step 2: 如需要,从 v1 迁移 R2 客户端

参考 servers/admin_server/app/services/cf_r2.py,创建共享的 S3 兼容存储客户端(R2 兼容 S3 API)。

  • Step 3: Commit
git add servers/shared/contracts/src/rgb_contracts/infra/storage.py
git commit -m "feat: add Cloudflare R2 storage client"

Phase 4: 长期 DDD(低优先级)

Task 14: 按服务隔离数据库 schema

Files:

  • Create: servers/shared/rgb_db/migrations/versions/0013_create_wallet_schema.py(接续现有 0012)
  • Modify: 各服务的 app/core/settings/app.py(添加 schema 配置)

Context: 当前所有服务共享一个 PostgreSQL 数据库的默认 schema。wallet_service 直接 SELECT FOR UPDATE + UPDATE player 表(player_service 的 owned 表)。

阶段目标:在同一数据库内创建 per-service PostgreSQL schemas(不拆分数据库),先从 wallet_service 开始。

  • Step 1: 创建 wallet schema migration

新增 Alembic migration(版本号 0013,接续现有的 0012_agent_message_read_table.py):

  • 创建 wallet schema

  • wallet_idempotencywallet_outbox 表移到 wallet schema

  • 保持 player 表在 public schema(player_service 拥有)

  • Step 2: 更新 wallet_service 连接配置

在 settings 中添加 search_path 包含 wallet, public

  • Step 3: 测试 wallet_service 仍能正常工作
cd servers_v2/wallet_service
uv run pytest tests/ -v
  • Step 4: Commit
git add servers/shared/rgb_db/migrations/versions/NNNN_create_wallet_schema.py
git commit -m "feat: create wallet schema for data isolation (phase 1)"

Task 15: 共享 ORM 解耦(每服务独立模型)

Files:

  • 各服务的 app/models/ 目录

Context: 所有服务 import from rgb_db.models.main。目标是每个服务只导入自己 own 的表模型,对其他服务的表使用 raw SQL 查询。

这是渐进式重构,从影响最小的服务开始:

  • Step 1: 审计各服务的 ORM 使用情况
for svc in wallet_service rolling_service promotion_service game_service player_service admin_service agent_service; do
echo "=== $svc ==="
grep -r "from rgb_db" servers_v2/$svc/app/ --include="*.py" | head -5
done
  • Step 2: 从 rolling_service 开始解耦

rolling_service 主要需要 PlayerRolling 模型。将其 ORM 访问改为 text SQL(已有部分使用 text SQL 的先例)。

  • Step 3: 逐步对其他服务执行相同操作

  • Step 4: Commit per service


Task 16: admin/agent 服务集成测试

Files:

  • Create: servers_v2/admin_service/tests/test_admin_integration.py
  • Create: servers_v2/agent_service/tests/test_agent_integration.py

Context: wallet/rolling/promotion/game/player 各有集成测试,admin 和 agent 只有单元测试。

  • Step 1: 创建 admin_service 集成测试

测试关键流程:

  • 玩家列表查询 + 分页

  • 充值审核(调用 wallet_client mock)

  • 统计查询

  • Step 2: 创建 agent_service 集成测试

测试关键流程:

  • Agent 登录

  • 玩家列表

  • 提现请求

  • Step 3: 运行测试

cd servers_v2/admin_service && uv run pytest tests/test_admin_integration.py -v
cd servers_v2/agent_service && uv run pytest tests/test_agent_integration.py -v
  • Step 4: Commit
git add servers_v2/admin_service/tests/test_admin_integration.py \
servers_v2/agent_service/tests/test_agent_integration.py
git commit -m "test: add integration tests for admin and agent services"

Task 17: 跨服务 E2E 测试套件

Files:

  • Create: servers_v2/tests/e2e/test_deposit_to_withdraw_flow.py
  • Create: servers_v2/tests/e2e/conftest.py
  • Create: servers_v2/tests/e2e/docker-compose.test.yml

Context: 无跨服务测试。关键业务流程(充值 → 投注 → 流水 → 提现)跨越 5 个服务。

  • Step 1: 创建测试用 docker-compose

基于 Task 1 的 docker-compose,添加测试配置:

  • 使用独立的测试数据库

  • 所有服务使用测试环境变量

  • Step 2: 编写 E2E 测试

测试完整流程:

  1. 通过 player_service 注册玩家
  2. 通过 admin_service 审核充值
  3. 通过 game_service 模拟投注回调
  4. 验证 rolling_service 流水进度
  5. 通过 wallet_service 提现
  • Step 3: Commit
git add servers_v2/tests/
git commit -m "test: add cross-service E2E test suite for deposit-to-withdraw flow"

Task 18: 事件 Schema 版本控制

Files:

  • Modify: servers/shared/contracts/src/rgb_contracts/events/base.py
  • Modify: 各 event consumer 中的反序列化逻辑
  • Test: servers/shared/contracts/tests/test_event_versioning.py

Context: DomainEvent 基类没有版本字段。当事件 schema 变更时,消费者可能收到不兼容的格式。

  • Step 1: 编写版本化事件测试
def test_event_has_schema_version():
event = DomainEvent(event_type="test", aggregate_id=1, payload={})
assert hasattr(event, 'schema_version')
assert event.schema_version == 1
  • Step 2: 在 DomainEvent 基类添加 schema_version

修改 events/base.py

class DomainEvent(BaseModel):
schema_version: int = 1
event_type: str
aggregate_id: int
payload: dict
# ... existing fields
  • Step 3: 在 event consumer 添加版本检查日志

rolling_service/app/services/event_consumer.pypromotion_service/app/services/event_consumer.py 中,处理消息前检查 schema_version,如果是未知版本则 log warning。

  • Step 4: 运行所有相关测试
cd servers/shared/contracts && uv run pytest tests/ -v
cd servers_v2/rolling_service && uv run pytest tests/ -v
cd servers_v2/promotion_service && uv run pytest tests/ -v
  • Step 5: Commit
git add servers/shared/contracts/src/rgb_contracts/events/base.py \
servers/shared/contracts/tests/test_event_versioning.py \
servers_v2/rolling_service/app/services/event_consumer.py \
servers_v2/promotion_service/app/services/event_consumer.py
git commit -m "feat: add schema_version to DomainEvent for forward compatibility"