模块间接口规范
定义六大引擎之间的通信协议、数据格式和错误处理。
核心原则:契约驱动,运行形态无关。 引擎通过事件和接口通信,不依赖彼此的内部实现。
通信模式
系统采用双模通信:
事件驱动(异步): 引擎产出结果 → 发布事件 → 下游引擎响应
请求驱动(同步): 引擎需要数据 → 调用接口 → 获取当前状态事件总线
前期:进程内 Python 事件总线(asyncio + pyee 或自建 EventBus) 后期:可平滑迁移至 Redis Pub/Sub(接口不变)
python
class EventBus(Protocol):
"""事件总线接口——可替换实现"""
async def publish(self, event: Event) -> None: ...
async def subscribe(self, event_type: str, handler: Callable) -> None: ...
class Event(BaseModel):
"""所有事件的基类"""
event_id: str
event_type: str # 事件类型枚举
timestamp: datetime
source_engine: str # 产生事件的引擎
payload: dict # 事件数据状态查询
引擎需要读取其他引擎的当前状态时,通过同步接口调用:
python
class NarrativeStore(Protocol):
"""叙事存储查询接口"""
async def get_active_narratives(self) -> list[Narrative]: ...
async def get_narrative(self, id: str) -> Narrative | None: ...
async def search_similar(self, embedding: list[float], top_k: int) -> list[Narrative]: ...
class DataStore(Protocol):
"""数据存储查询接口"""
async def get_latest_indicators(self) -> IndicatorSnapshot: ...
async def get_indicator_history(self, name: str, days: int) -> list[IndicatorValue]: ...
class SentimentStore(Protocol):
"""情绪存储查询接口"""
async def get_latest_sentiment(self) -> SentimentSnapshot: ...
class JudgmentStore(Protocol):
"""判断存储查询接口"""
async def get_latest_judgment(self) -> Judgment | None: ...
async def get_judgment_history(self, limit: int) -> list[Judgment]: ...事件类型定义
python
class EventType(str, Enum):
# 叙事引擎产出
NARRATIVE_CREATED = "narrative.created" # 发现新叙事
NARRATIVE_UPDATED = "narrative.updated" # 叙事状态更新
NARRATIVE_STATUS_CHANGED = "narrative.status_changed" # 叙事状态变更
NARRATIVE_MERGED = "narrative.merged" # 叙事合并
NARRATIVE_SPLIT = "narrative.split" # 叙事分裂
# 数据引擎产出
INDICATORS_UPDATED = "indicators.updated" # 指标快照更新
INDICATOR_ANOMALY = "indicators.anomaly" # 指标异常
# 情绪引擎产出
SENTIMENT_UPDATED = "sentiment.updated" # 情绪快照更新
SENTIMENT_EXTREME = "sentiment.extreme" # 极端情绪
# 智库引擎产出
JUDGMENT_COMPLETED = "judgment.completed" # ensemble 判断完成
# 错配引擎产出
MISMATCH_DETECTED = "mismatch.detected" # 错配信号
MISMATCH_RESOLVED = "mismatch.resolved" # 错配消退
# 交易层产出
TRADE_PROPOSED = "trade.proposed" # 交易提案
# 系统事件
VALIDATION_COMPLETED = "validation.completed" # 事后验证完成
PERSONA_EVOLUTION_PROPOSED = "persona.evolution_proposed" # Persona 优化建议引擎间数据流
┌──────────────┐
│ RSS / API │
│ 新闻源 │
└──────┬───────┘
│ news articles
▼
┌──────────────────┐
│ ① 叙事发现引擎 │
└────┬──────┬──────┘
│ │
narrative.created/updated │
│ │
▼ ▼
┌─────────────────────────────────┐
│ 事件总线 │
└──┬──────────┬──────────┬────────┘
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌─────────┐ ┌──────────────┐
│ ② 数据采集 │ │ ③ 情绪 │ │ ④ 智库引擎 │
│ 引擎 │ │ 引擎 │ │ (Persona池) │
│ │ │ │ │ │
│ 定时拉取 │ │ 定时拉取 │ │ 叙事+数据 │
│ 指标计算 │ │ 情绪聚合 │ │ +情绪 → │
│ 异常检测 │ │ │ │ 独立判断 │
└───────┬───────┘ └────┬────┘ │ +对抗辩论 │
│ │ │ +合成 │
▼ ▼ └──────┬───────┘
indicators. sentiment. │
updated updated │ judgment.completed
│ │ │
└──────────────┼─────────────┘
▼
┌─────────────────┐
│ ⑤ 错配检测引擎 │
│ │
│ 智库 vs 情绪 │
│ → Alpha/Beta │
└────────┬────────┘
│ mismatch.detected
▼
┌─────────────────┐
│ ⑥ 交易提案 │
│ (预留) │
└─────────────────┘
⑦ 记忆系统(横贯所有引擎)
所有事件 → 持久化
所有查询 → 提供接口各引擎接口定义
引擎 1:叙事发现引擎
输入:
- news articles (来自 RSS/API 采集)
输出事件:
- narrative.created { narrative: Narrative }
- narrative.updated { narrative_id, changes: dict }
- narrative.status_changed { narrative_id, old_status, new_status }
- narrative.merged { source_ids: list[str], target_id: str }
- narrative.split { source_id: str, target_ids: list[str] }
查询接口:
- get_active_narratives() -> list[Narrative]
- get_narrative(id) -> Narrative
- search_similar(embedding, top_k) -> list[Narrative]
- get_narrative_events(narrative_id) -> list[NarrativeEvent]
- get_narrative_edges(narrative_id) -> list[NarrativeEdge]
触发方式:
- 事件驱动:新新闻到达时触发
- 定时任务:定期检查候选叙事是否应转为 active / fading引擎 2:数据采集引擎
输入:
- 定时触发(APScheduler)
- 按需调用(人工查询)
输出事件:
- indicators.updated { snapshot: IndicatorSnapshot }
- indicators.anomaly { indicator_name, value, z_score, note }
查询接口:
- get_latest_indicators() -> IndicatorSnapshot
- get_indicator_groups() -> IndicatorGroups
- get_indicator_history(name, days) -> list[IndicatorValue]
数据源优先级:
- OpenBB(统一层)→ IBKR(实时补丁)→ Longbridge(中国)→ Binance(加密)
错误处理:
- 单个数据源故障 → 降级使用其他源,标记 degraded
- 全部故障 → 发布告警事件,使用上次快照引擎 3:情绪引擎
输入:
- 定时触发(APScheduler)
输出事件:
- sentiment.updated { snapshot: SentimentSnapshot }
- sentiment.extreme { snapshot, extreme_fields: list[str] }
查询接口:
- get_latest_sentiment() -> SentimentSnapshot
- get_sentiment_history(days) -> list[SentimentSnapshot]
触发方式:
- 定时任务(每小时/每日)
- 重大新闻事件时触发即时更新引擎 4:智库引擎 (Persona Agents)
输入:
- 触发条件满足时启动
触发条件(满足任一):
- narrative.created / narrative.updated(新叙事或重大更新)
- indicators.anomaly(指标异常)
- sentiment.extreme(极端情绪)
- 定时触发(每日至少一次完整分析)
所需数据(同步查询):
- NarrativeStore.get_active_narratives()
- DataStore.get_latest_indicators()
- SentimentStore.get_latest_sentiment()
- JudgmentStore.get_judgment_history(limit=10) # 历史判断参考
处理流程:
Round 1: 每个 Persona 独立判断 → PersonaJudgment
Round 2: 共享所有判断 → 交叉质疑 → DebateRound
Round 3: 修正判断
Round 4: Ensemble 合成 → JudgmentSynthesis
输出事件:
- judgment.completed { judgment: Judgment }引擎 5:错配检测引擎
输入:
- judgment.completed 事件触发
所需数据(同步查询):
- JudgmentStore.get_latest_judgment()
- SentimentStore.get_latest_sentiment()
处理逻辑:
1. 提取智库共识方向和确信度
2. 提取市场情绪方向和强度
3. 计算错配度(raw + sustained)
4. 分类四象限
5. 生成信号
输出事件:
- mismatch.detected { mismatch: MismatchScore }
- mismatch.resolved { mismatch_id }
触发方式:
- 事件驱动:judgment.completed 时自动触发引擎 6:记忆系统
角色: 横贯所有引擎的基础设施
职责:
- 持久化所有事件(Event Sourcing)
- 提供查询接口(NarrativeStore, DataStore, ...)
- 生成快照(定期,避免重放开销)
- 元学习分析(定期)
输出事件:
- validation.completed { validation: JudgmentValidation }
- persona.evolution_proposed { proposal: PersonaEvolutionProposal }
触发方式:
- 被动:所有引擎发布事件时自动持久化
- 主动:定期生成快照、定期运行元学习错误处理策略
python
class ErrorStrategy:
"""引擎间错误处理"""
# 1. 单引擎故障
# 不阻塞其他引擎,使用上次有效数据
ENGINE_FAILURE = "use_last_valid"
# 2. 数据源降级
# 主源不可用时自动切换备用源
DATA_SOURCE_FALLBACK = "cascade_fallback"
# 3. LLM 调用失败
# 重试 3 次 → 切换模型 → 跳过该 Persona(降低 ensemble 置信度)
LLM_FAILURE = "retry_then_skip"
# 4. 事件丢失
# 事件持久化到数据库,重启后可重放
EVENT_PERSISTENCE = "durable_events"
# 5. 超时
# 每个引擎有最大执行时间,超时后终止并记录
TIMEOUT = "terminate_and_log"超时配置
python
TIMEOUTS = {
"narrative_discovery": 30, # 30 秒
"data_collection": 60, # 60 秒(多数据源)
"sentiment": 30, # 30 秒
"persona_single": 120, # 单个 Persona 2 分钟
"persona_total": 600, # 全部 Persona 10 分钟
"synthesis": 120, # 合成分析 2 分钟
"mismatch_detection": 10, # 10 秒(纯计算)
}接口版本兼容
python
class EventVersion(BaseModel):
"""事件版本标记"""
schema_version: str = "1.0.0" # 语义化版本
# 兼容规则:
# - 补丁版本(1.0.x):向后兼容,可忽略新字段
# - 次版本(1.x.0):向后兼容,新字段有默认值
# - 主版本(x.0.0):不兼容,需要迁移已讨论决定
- [x] 双模通信:事件驱动(异步)+ 状态查询(同步)
- [x] 事件总线:前期进程内,后期可迁移 Redis Pub/Sub
- [x] 接口定义:Protocol 类,运行形态无关
- [x] 触发机制:事件驱动 + 定时任务混合
- [x] 错误处理:降级策略,不阻塞其他引擎
- [x] 超时配置:各引擎独立超时
- [x] 版本兼容:语义化版本
待后续细化
- [ ] 事件持久化方案(前期 SQLite,后期 Redis Streams)
- [ ] 重放机制的具体实现
- [ ] 监控指标(事件延迟、处理时间、错误率)