Skip to content

模块间接口规范

定义六大引擎之间的通信协议、数据格式和错误处理。

核心原则:契约驱动,运行形态无关。 引擎通过事件和接口通信,不依赖彼此的内部实现。

通信模式

系统采用双模通信

事件驱动(异步):  引擎产出结果 → 发布事件 → 下游引擎响应
请求驱动(同步):  引擎需要数据 → 调用接口 → 获取当前状态

事件总线

前期:进程内 Python 事件总线(asyncio + pyee 或自建 EventBus) 后期:可平滑迁移至 Redis Pub/Sub(接口不变)

python
class EventBus(Protocol):
    """事件总线接口——可替换实现"""
    async def publish(self, event: Event) -> None: ...
    async def subscribe(self, event_type: str, handler: Callable) -> None: ...

class Event(BaseModel):
    """所有事件的基类"""
    event_id: str
    event_type: str                 # 事件类型枚举
    timestamp: datetime
    source_engine: str              # 产生事件的引擎
    payload: dict                   # 事件数据

状态查询

引擎需要读取其他引擎的当前状态时,通过同步接口调用:

python
class NarrativeStore(Protocol):
    """叙事存储查询接口"""
    async def get_active_narratives(self) -> list[Narrative]: ...
    async def get_narrative(self, id: str) -> Narrative | None: ...
    async def search_similar(self, embedding: list[float], top_k: int) -> list[Narrative]: ...

class DataStore(Protocol):
    """数据存储查询接口"""
    async def get_latest_indicators(self) -> IndicatorSnapshot: ...
    async def get_indicator_history(self, name: str, days: int) -> list[IndicatorValue]: ...

class SentimentStore(Protocol):
    """情绪存储查询接口"""
    async def get_latest_sentiment(self) -> SentimentSnapshot: ...

class JudgmentStore(Protocol):
    """判断存储查询接口"""
    async def get_latest_judgment(self) -> Judgment | None: ...
    async def get_judgment_history(self, limit: int) -> list[Judgment]: ...

事件类型定义

python
class EventType(str, Enum):
    # 叙事引擎产出
    NARRATIVE_CREATED = "narrative.created"             # 发现新叙事
    NARRATIVE_UPDATED = "narrative.updated"             # 叙事状态更新
    NARRATIVE_STATUS_CHANGED = "narrative.status_changed"  # 叙事状态变更
    NARRATIVE_MERGED = "narrative.merged"               # 叙事合并
    NARRATIVE_SPLIT = "narrative.split"                 # 叙事分裂

    # 数据引擎产出
    INDICATORS_UPDATED = "indicators.updated"           # 指标快照更新
    INDICATOR_ANOMALY = "indicators.anomaly"            # 指标异常

    # 情绪引擎产出
    SENTIMENT_UPDATED = "sentiment.updated"             # 情绪快照更新
    SENTIMENT_EXTREME = "sentiment.extreme"             # 极端情绪

    # 智库引擎产出
    JUDGMENT_COMPLETED = "judgment.completed"           # ensemble 判断完成

    # 错配引擎产出
    MISMATCH_DETECTED = "mismatch.detected"             # 错配信号
    MISMATCH_RESOLVED = "mismatch.resolved"             # 错配消退

    # 交易层产出
    TRADE_PROPOSED = "trade.proposed"                   # 交易提案

    # 系统事件
    VALIDATION_COMPLETED = "validation.completed"       # 事后验证完成
    PERSONA_EVOLUTION_PROPOSED = "persona.evolution_proposed"  # Persona 优化建议

引擎间数据流

                      ┌──────────────┐
                      │  RSS / API   │
                      │  新闻源      │
                      └──────┬───────┘
                             │ news articles

                   ┌──────────────────┐
                   │ ① 叙事发现引擎   │
                   └────┬──────┬──────┘
                        │      │
    narrative.created/updated  │
                        │      │
                        ▼      ▼
         ┌─────────────────────────────────┐
         │          事件总线               │
         └──┬──────────┬──────────┬────────┘
            │          │          │
            ▼          ▼          ▼
┌───────────────┐ ┌─────────┐ ┌──────────────┐
│ ② 数据采集    │ │ ③ 情绪  │ │ ④ 智库引擎   │
│    引擎       │ │   引擎  │ │ (Persona池)  │
│               │ │         │ │              │
│ 定时拉取      │ │ 定时拉取 │ │ 叙事+数据    │
│ 指标计算      │ │ 情绪聚合 │ │ +情绪 →     │
│ 异常检测      │ │         │ │ 独立判断     │
└───────┬───────┘ └────┬────┘ │ +对抗辩论    │
        │              │      │ +合成        │
        ▼              ▼      └──────┬───────┘
  indicators.    sentiment.          │
  updated        updated             │ judgment.completed
        │              │             │
        └──────────────┼─────────────┘

              ┌─────────────────┐
              │ ⑤ 错配检测引擎  │
              │                 │
              │ 智库 vs 情绪    │
              │ → Alpha/Beta    │
              └────────┬────────┘
                       │ mismatch.detected

              ┌─────────────────┐
              │ ⑥ 交易提案      │
              │    (预留)       │
              └─────────────────┘

         ⑦ 记忆系统(横贯所有引擎)
              所有事件 → 持久化
              所有查询 → 提供接口

各引擎接口定义

引擎 1:叙事发现引擎

输入:
  - news articles (来自 RSS/API 采集)

输出事件:
  - narrative.created     { narrative: Narrative }
  - narrative.updated     { narrative_id, changes: dict }
  - narrative.status_changed { narrative_id, old_status, new_status }
  - narrative.merged      { source_ids: list[str], target_id: str }
  - narrative.split       { source_id: str, target_ids: list[str] }

查询接口:
  - get_active_narratives() -> list[Narrative]
  - get_narrative(id) -> Narrative
  - search_similar(embedding, top_k) -> list[Narrative]
  - get_narrative_events(narrative_id) -> list[NarrativeEvent]
  - get_narrative_edges(narrative_id) -> list[NarrativeEdge]

触发方式:
  - 事件驱动:新新闻到达时触发
  - 定时任务:定期检查候选叙事是否应转为 active / fading

引擎 2:数据采集引擎

输入:
  - 定时触发(APScheduler)
  - 按需调用(人工查询)

输出事件:
  - indicators.updated    { snapshot: IndicatorSnapshot }
  - indicators.anomaly    { indicator_name, value, z_score, note }

查询接口:
  - get_latest_indicators() -> IndicatorSnapshot
  - get_indicator_groups() -> IndicatorGroups
  - get_indicator_history(name, days) -> list[IndicatorValue]

数据源优先级:
  - OpenBB(统一层)→ IBKR(实时补丁)→ Longbridge(中国)→ Binance(加密)

错误处理:
  - 单个数据源故障 → 降级使用其他源,标记 degraded
  - 全部故障 → 发布告警事件,使用上次快照

引擎 3:情绪引擎

输入:
  - 定时触发(APScheduler)

输出事件:
  - sentiment.updated     { snapshot: SentimentSnapshot }
  - sentiment.extreme     { snapshot, extreme_fields: list[str] }

查询接口:
  - get_latest_sentiment() -> SentimentSnapshot
  - get_sentiment_history(days) -> list[SentimentSnapshot]

触发方式:
  - 定时任务(每小时/每日)
  - 重大新闻事件时触发即时更新

引擎 4:智库引擎 (Persona Agents)

输入:
  - 触发条件满足时启动

触发条件(满足任一):
  - narrative.created / narrative.updated(新叙事或重大更新)
  - indicators.anomaly(指标异常)
  - sentiment.extreme(极端情绪)
  - 定时触发(每日至少一次完整分析)

所需数据(同步查询):
  - NarrativeStore.get_active_narratives()
  - DataStore.get_latest_indicators()
  - SentimentStore.get_latest_sentiment()
  - JudgmentStore.get_judgment_history(limit=10)  # 历史判断参考

处理流程:
  Round 1: 每个 Persona 独立判断 → PersonaJudgment
  Round 2: 共享所有判断 → 交叉质疑 → DebateRound
  Round 3: 修正判断
  Round 4: Ensemble 合成 → JudgmentSynthesis

输出事件:
  - judgment.completed    { judgment: Judgment }

引擎 5:错配检测引擎

输入:
  - judgment.completed 事件触发

所需数据(同步查询):
  - JudgmentStore.get_latest_judgment()
  - SentimentStore.get_latest_sentiment()

处理逻辑:
  1. 提取智库共识方向和确信度
  2. 提取市场情绪方向和强度
  3. 计算错配度(raw + sustained)
  4. 分类四象限
  5. 生成信号

输出事件:
  - mismatch.detected     { mismatch: MismatchScore }
  - mismatch.resolved     { mismatch_id }

触发方式:
  - 事件驱动:judgment.completed 时自动触发

引擎 6:记忆系统

角色: 横贯所有引擎的基础设施

职责:
  - 持久化所有事件(Event Sourcing)
  - 提供查询接口(NarrativeStore, DataStore, ...)
  - 生成快照(定期,避免重放开销)
  - 元学习分析(定期)

输出事件:
  - validation.completed  { validation: JudgmentValidation }
  - persona.evolution_proposed { proposal: PersonaEvolutionProposal }

触发方式:
  - 被动:所有引擎发布事件时自动持久化
  - 主动:定期生成快照、定期运行元学习

错误处理策略

python
class ErrorStrategy:
    """引擎间错误处理"""

    # 1. 单引擎故障
    #    不阻塞其他引擎,使用上次有效数据
    ENGINE_FAILURE = "use_last_valid"

    # 2. 数据源降级
    #    主源不可用时自动切换备用源
    DATA_SOURCE_FALLBACK = "cascade_fallback"

    # 3. LLM 调用失败
    #    重试 3 次 → 切换模型 → 跳过该 Persona(降低 ensemble 置信度)
    LLM_FAILURE = "retry_then_skip"

    # 4. 事件丢失
    #    事件持久化到数据库,重启后可重放
    EVENT_PERSISTENCE = "durable_events"

    # 5. 超时
    #    每个引擎有最大执行时间,超时后终止并记录
    TIMEOUT = "terminate_and_log"

超时配置

python
TIMEOUTS = {
    "narrative_discovery": 30,      # 30 秒
    "data_collection": 60,          # 60 秒(多数据源)
    "sentiment": 30,                # 30 秒
    "persona_single": 120,          # 单个 Persona 2 分钟
    "persona_total": 600,           # 全部 Persona 10 分钟
    "synthesis": 120,               # 合成分析 2 分钟
    "mismatch_detection": 10,       # 10 秒(纯计算)
}

接口版本兼容

python
class EventVersion(BaseModel):
    """事件版本标记"""
    schema_version: str = "1.0.0"   # 语义化版本

    # 兼容规则:
    # - 补丁版本(1.0.x):向后兼容,可忽略新字段
    # - 次版本(1.x.0):向后兼容,新字段有默认值
    # - 主版本(x.0.0):不兼容,需要迁移

已讨论决定

  • [x] 双模通信:事件驱动(异步)+ 状态查询(同步)
  • [x] 事件总线:前期进程内,后期可迁移 Redis Pub/Sub
  • [x] 接口定义:Protocol 类,运行形态无关
  • [x] 触发机制:事件驱动 + 定时任务混合
  • [x] 错误处理:降级策略,不阻塞其他引擎
  • [x] 超时配置:各引擎独立超时
  • [x] 版本兼容:语义化版本

待后续细化

  • [ ] 事件持久化方案(前期 SQLite,后期 Redis Streams)
  • [ ] 重放机制的具体实现
  • [ ] 监控指标(事件延迟、处理时间、错误率)

关联文档:规范总览 · 核心数据模型 · 六大引擎详细设计

基于 VitePress 构建