六大引擎详细设计
每个引擎的算法逻辑、处理流程、边界条件和异常处理。
本文档聚焦引擎内部设计。引擎间的通信协议见 03-interface-contract.md。
引擎 1:叙事发现引擎 (Narrative Discovery)
职责
从新闻中自动提取叙事命题,追踪演化,发现新兴叙事。
处理流程
新闻输入 → 叙事提取 → 语义匹配 → 新旧判定 → 状态更新/新建
│ │ │ │
▼ ▼ ▼ ▼
raw text structured find create or
proposition similar update1.1 叙事提取
输入:原始新闻文本(标题 + 正文 + 元数据)
处理:LLM 提取结构化叙事命题
python
EXTRACTION_PROMPT = """
你是一个宏观经济叙事分析师。从以下新闻中提取核心叙事命题。
新闻标题:{title}
新闻正文:{content}
发布时间:{published_at}
来源:{outlet}
请提取:
1. 核心叙事主题(一句话概括)
2. 叙事描述(2-3句话详细说明)
3. 叙事情绪(鸽/鹰、多/空、乐观/悲观)
4. 叙事强度(这条新闻对叙事的支撑强度 0-1)
5. 关键词标签(3-5个)
6. 涉及的资产类别
输出 JSON 格式,字段:theme, description, sentiment, strength, tags, asset_classes
"""输出约束:
- 必须输出合法 JSON(代码校验)
- sentiment ∈ [-1, 1]
- strength ∈ [0, 1]
- tags 数量 3-5 个
1.2 语义匹配(新旧对比)
新叙事命题提取后,与已有叙事做语义匹配:
python
async def match_narrative(new_proposition: NarrativeProposition) -> MatchResult:
# 1. 生成 embedding
embedding = await embedding_provider.embed_text(
f"{new_proposition.theme} {new_proposition.description}"
)
# 2. 在已有叙事中搜索最相似的
similar = await narrative_store.search_similar(embedding, top_k=5)
# 3. 判定
if not similar:
return MatchResult(action="create_candidate")
best_match = similar[0]
if best_match.similarity > 0.85:
# 高相似度 → 更新现有叙事
return MatchResult(
action="update",
target_id=best_match.narrative_id,
changes=compute_changes(best_match, new_proposition)
)
elif best_match.similarity > 0.6:
# 中等相似度 → 可能是子叙事或相关叙事
return MatchResult(
action="create_sub",
parent_id=best_match.narrative_id,
relation_type="intersects"
)
else:
# 低相似度 → 候选新叙事
return MatchResult(action="create_candidate")1.3 叙事状态机转换
python
# 状态转换规则
TRANSITIONS = {
"candidate": {
"to_active": "多源确认(>=3个独立来源)且 strength > 0.3",
"to_rejected": "48小时内无新来源确认",
},
"active": {
"to_fading": "strength_ewma < 0.2 且 trend == weakening",
},
"fading": {
"to_archived": "strength_ewma < 0.1 或 7天无更新",
"to_active": "新来源使 strength 回升 > 0.4",
},
}1.4 叙事合并/分裂检测
python
async def check_merge_split():
"""定期检查(每6小时)"""
# 合并检测:多个弱叙事高度相关且同时强化
weak = await get_narratives(max_strength=0.3, trend="strengthening")
clusters = find_semantic_clusters(weak, threshold=0.8)
for cluster in clusters:
if len(cluster) >= 2:
await event_bus.publish(Event(
type="narrative.merge_proposed",
payload={"narrative_ids": [n.id for n in cluster]}
)) # 提交人工确认
# 分裂检测:一个叙事内部来源情绪分化
strong = await get_narratives(min_strength=0.7)
for n in strong:
sentiments = [s.sentiment for s in n.sources]
if np.std(sentiments) > 0.4:
await event_bus.publish(Event(
type="narrative.split_proposed",
payload={"narrative_id": n.id, "sentiment_variance": np.std(sentiments)}
)) # 提交人工确认边界条件
- 空新闻/噪音新闻 → LLM 返回空结果,跳过
- LLM 输出格式错误 → 重试 2 次,仍失败则记录日志跳过
- 重复新闻(相同 URL)→ 幂等去重
- 非宏观新闻(体育、娱乐)→ LLM 标签过滤引擎 2:数据采集引擎 (Data Collection)
职责
从多个 API 获取市场数据,计算宏观指标,标记异常。
2.1 数据源接入架构
python
class DataProvider(Protocol):
"""数据源接口——所有数据源实现此接口"""
name: DataSource
async def fetch(self, request: DataRequest) -> DataResponse: ...
async def health_check(self) -> bool: ...
# 实现
class OpenBBProvider(DataProvider): ... # 统一层
class IBKRProvider(DataProvider): ... # 实时补丁
class LongbridgeProvider(DataProvider): ... # 中国市场
class BinanceProvider(DataProvider): ... # 加密永续
class DataCollector:
"""数据采集引擎核心"""
providers: list[DataProvider]
async def collect_all(self) -> IndicatorSnapshot:
results = await asyncio.gather(
*[p.fetch(request) for p in self.providers],
return_exceptions=True
)
# 合并结果,降级处理
return self.merge_results(results)2.2 指标计算逻辑
python
class IndicatorCalculator:
"""指标计算——纯确定性代码,不涉及 LLM"""
def calc_yield_curve_slope(self, rates: dict) -> IndicatorValue:
"""收益率曲线斜率"""
value = rates["10Y"] - rates["2Y"]
return IndicatorValue(
name="yield_curve_2s10s",
category=IndicatorCategory.rates,
value=value,
unit="percent",
z_score=self.calc_z_score("yield_curve_2s10s", value),
percentile=self.calc_percentile("yield_curve_2s10s", value),
trend=self.calc_trend("yield_curve_2s10s"),
is_anomaly=abs(self.calc_z_score("yield_curve_2s10s", value)) > 2,
)
def calc_spread(self, series_a: str, series_b: str) -> IndicatorValue:
"""利差计算"""
...
def calc_funding_rate_stats(self, rates: list[float]) -> IndicatorValue:
"""资金费率统计"""
...
def calc_z_score(self, indicator_name: str, current: float) -> float:
"""Z-score:当前值相对于历史的位置"""
history = self.get_history(indicator_name, days=252) # 一年
mean = np.mean(history)
std = np.std(history)
return (current - mean) / std if std > 0 else 0.0
def calc_percentile(self, indicator_name: str, current: float) -> float:
"""历史百分位"""
history = self.get_history(indicator_name, days=252)
return sum(1 for v in history if v < current) / len(history)
def calc_trend(self, indicator_name: str) -> TrendDirection:
"""趋势判断(线性回归斜率)"""
recent = self.get_history(indicator_name, days=20)
slope = np.polyfit(range(len(recent)), recent, 1)[0]
if slope > 0.01:
return TrendDirection.rising
elif slope < -0.01:
return TrendDirection.falling
else:
return TrendDirection.stable2.3 采集调度
python
SCHEDULE = {
# 高频(实时层)
"funding_rate": {"interval": "1min", "provider": "binance"},
"orderbook_depth": {"interval": "5min", "provider": "binance"},
# 中频(日频层)
"yield_curve": {"interval": "1hour", "provider": "openbb"},
"credit_spreads": {"interval": "1hour", "provider": "openbb"},
"equity_indices": {"interval": "5min", "provider": "ibkr"},
"fx_rates": {"interval": "5min", "provider": "ibkr"},
# 低频
"economic_calendar":{"interval": "1day", "provider": "openbb"},
"cftc_cot": {"interval": "1week", "provider": "openbb"},
"northbound_flow": {"interval": "1day", "provider": "longbridge"},
}边界条件
- 数据源超时 → 跳过该源,标记 degraded,使用上次数据
- 数据格式异常 → 记录日志,跳过该条目
- 全部数据源故障 → 发布告警事件,保持上次快照
- 数据源限流 → 降低采集频率,记录限流状态引擎 3:市场情绪引擎 (Market Sentiment)
职责
采集散户情绪、公众舆论、机构行为,输出情绪快照。
3.1 散户情绪采集
python
class RetailSentimentCollector:
async def collect(self) -> RetailSentiment:
results = await asyncio.gather(
self.get_fear_greed(), # alternative.me
self.get_put_call_ratio(), # IBKR / CBOE
self.get_margin_balance(), # Longbridge
self.get_social_sentiment(),# 雪球/Twitter NLP
self.get_search_trend(), # 百度指数/Google Trends
)
return RetailSentiment(*results)
async def get_social_sentiment(self) -> float:
"""社交媒体情绪——LLM 批量打分"""
posts = await fetch_recent_posts(limit=100)
# 批量送 LLM 打分,返回平均值
scores = await llm_batch_score(posts, task="sentiment")
return np.mean(scores)3.2 媒体情绪采集
python
class MediaSentimentCollector:
async def collect(self) -> MediaSentiment:
# 新闻标题情绪(LLM 打分)
headlines = await fetch_recent_headlines(hours=24)
news_sentiment = await llm_batch_score(headlines, task="sentiment")
# 央行声明鸽鹰度(LLM 分析)
statements = await fetch_central_bank_statements()
cb_tone = await llm_analyze_central_bank_tone(statements)
# KOL 观点聚合
kol_posts = await fetch_kol_posts()
kol_consensus = await llm_batch_score(kol_posts, task="sentiment")
return MediaSentiment(
news_sentiment=np.mean(news_sentiment),
news_volume=len(headlines),
central_bank_tone=cb_tone,
kol_consensus=np.mean(kol_consensus),
)3.3 机构行为采集
python
class InstitutionalSentimentCollector:
async def collect(self) -> InstitutionalSentiment:
return InstitutionalSentiment(
fund_cash_level=await self.get_fund_cash_level(), # BofA Survey
etf_flow_direction=await self.get_etf_flow(), # ETF.com
cftc_net_position=await self.get_cftc_position(), # CFTC
ipo_activity=await self.get_ipo_activity(), # SEC filing
)3.4 极端情绪检测
python
EXTREME_THRESHOLDS = {
"fear_greed_index": {"low": 15, "high": 85},
"put_call_ratio": {"low": 0.6, "high": 1.2},
"social_sentiment": {"low": -0.7, "high": 0.7},
}
async def check_extreme(snapshot: SentimentSnapshot) -> list[str]:
"""检测极端情绪,返回极端字段列表"""
extremes = []
for field, thresholds in EXTREME_THRESHOLDS.items():
value = get_nested(snapshot, field)
if value < thresholds["low"] or value > thresholds["high"]:
extremes.append(field)
return extremes引擎 4:智库引擎 (Think Tank / Persona Agents)
职责
基于蒸馏 Persona,以蒙特卡罗对抗辩论的形式输出理性分析。
4.1 分析触发条件
python
TRIGGER_CONDITIONS = [
# 事件驱动
{"event": "narrative.created", "min_strength": 0.4},
{"event": "narrative.updated", "min_strength_change": 0.2},
{"event": "indicators.anomaly"},
{"event": "sentiment.extreme"},
# 定时驱动
{"schedule": "daily", "time": "08:00 UTC"}, # 每日一次完整分析
]4.2 Persona 独立分析
python
async def persona_analyze(
persona: PersonaConfig,
context: AnalysisContext
) -> PersonaJudgment:
"""单个 Persona 的独立分析"""
prompt = f"""
{persona.analysis_framework}
---
当前叙事状态:
{format_narratives(context.narratives)}
当前市场指标:
{format_indicators(context.indicators)}
当前市场情绪:
{format_sentiment(context.sentiment)}
历史判断参考:
{format_history(context.recent_judgments)}
请按照 {persona.scoring_rubric} 的标准,给出你的独立判断。
输出 JSON 格式:
- regime: MacroRegime 枚举
- regime_confidence: [0, 1]
- narrative_assessment: string(你的叙事分析)
- narrative_belief: [-1, 1]
- scenarios: list[Scenario]
- reasoning_chain: string(完整推理过程)
"""
response = await llm.generate(
prompt,
model=ANALYSIS_MODEL, # Claude Sonnet / GPT-4o
temperature=0.1, # 低温度,减少随机性
response_format={"type": "json_object"}
)
return PersonaJudgment(
persona_id=persona.id,
persona_version=persona.current_version,
**validate_json(response, PersonaJudgmentSchema)
)4.3 对抗辩论
python
async def run_debate(
judgments: list[PersonaJudgment],
context: AnalysisContext
) -> list[DebateRound]:
"""多轮对抗辩论"""
rounds = []
for round_num in range(1, MAX_DEBATE_ROUNDS + 1):
challenges = []
for challenger in judgments:
for target in judgments:
if challenger.persona_id == target.persona_id:
continue
# 检查是否有实质性分歧
if not has_significant_disagreement(challenger, target):
continue
# 生成质疑
challenge_prompt = f"""
你是 {challenger.persona_id},你的判断是:
Regime: {challenger.regime} (confidence: {challenger.regime_confidence})
叙事看法: {challenger.narrative_assessment}
你正在质疑 {target.persona_id},他的判断是:
Regime: {target.regime} (confidence: {target.regime_confidence})
叙事看法: {target.narrative_assessment}
当前数据:
{format_indicators(context.indicators)}
请提出你的质疑(具体、有数据支撑、指向对方推理的薄弱环节)。
然后模拟 {target.persona_id} 的回应。
输出 JSON:{{"challenge": "...", "response": "..."}}
"""
result = await llm.generate(challenge_prompt, temperature=0.2)
challenges.append(DebateChallenge(
challenger_id=challenger.persona_id,
target_id=target.persona_id,
**validate_json(result, DebateChallengeSchema)
))
rounds.append(DebateRound(round_number=round_num, challenges=challenges))
# 如果没有实质性分歧,提前结束
if not challenges:
break
return rounds4.4 Ensemble 合成
python
async def synthesize(
judgments: list[PersonaJudgment],
persona_weights: dict[str, float],
debate_rounds: list[DebateRound]
) -> JudgmentSynthesis:
"""加权合成最终判断"""
# Regime 投票(加权)
regime_votes: dict[MacroRegime, float] = defaultdict(float)
for j in judgments:
weight = persona_weights[j.persona_id]
regime_votes[j.regime] += weight * j.regime_confidence
final_regime = max(regime_votes, key=regime_votes.get)
final_confidence = regime_votes[final_regime] / sum(regime_votes.values())
# 叙事信念加权平均
narrative_belief = sum(
j.narrative_belief * persona_weights[j.persona_id]
for j in judgments
) / sum(persona_weights[j.persona_id] for j in judgments)
# 共识度计算
consensus = calculate_consensus(judgments)
# 分歧点提取
disagreements = extract_disagreements(judgments, debate_rounds)
return JudgmentSynthesis(
regime=final_regime,
regime_confidence=final_confidence,
narrative_data_alignment=assess_alignment(narrative_belief, context.indicators),
key_tension=identify_tension(judgments, context),
scenarios=merge_scenarios(judgments),
persona_consensus_level=consensus,
key_disagreements=disagreements,
invalidation_conditions=extract_invalidation(judgments),
)边界条件
- 单个 Persona LLM 调用超时 → 跳过该 Persona,降低 ensemble 置信度
- 所有 Persona 超时 → 记录错误,不输出判断
- 辩论陷入循环 → 最多 3 轮,强制结束
- 所有 Persona 高度一致 → 触发"反共识"检查(是否有被忽略的风险)引擎 5:错配检测引擎 (Mismatch Detection)
职责
将智库引擎的理性分析与市场情绪进行对比,分类 Alpha 和 Beta 信号。
5.1 计算逻辑
python
async def detect_mismatch(judgment: Judgment) -> MismatchScore:
"""错配检测——纯确定性计算"""
sentiment = await sentiment_store.get_latest_sentiment()
# 1. 智库共识方向 [-1, 1]
persona_direction = judgment.synthesis.regime_to_direction()
# expansion/recovery → +1, recession/deflation → -1, stagflation/late_cycle → 0 ± 0.3
persona_confidence = judgment.synthesis.regime_confidence
# 2. 市场情绪方向 [-1, 1]
sentiment_direction = (
normalize(sentiment.retail.fear_greed_index, 0, 100, -1, 1) * 0.3 +
normalize(sentiment.media.news_sentiment, -1, 1, -1, 1) * 0.3 +
normalize(sentiment.retail.social_sentiment, -1, 1, -1, 1) * 0.2 +
normalize(sentiment.institutional.etf_flow_direction, -1, 1, -1, 1) * 0.2
)
sentiment_intensity = abs(sentiment_direction)
# 3. 错配计算
mismatch_raw = persona_direction - sentiment_direction
# 4. 持续修正(需查询历史错配)
duration = await calc_mismatch_duration(judgment.narrative_snapshot_id, mismatch_raw)
mismatch_sustained = mismatch_raw * duration.total_hours
# 5. 分类
quadrant = classify_quadrant(persona_direction, sentiment_direction)
# 6. 信号强度
signal_strength = min(1.0,
abs(mismatch_raw) * 0.4 +
persona_confidence * 0.3 +
min(duration.total_days / 7, 1.0) * 0.3 # 持续越久信号越强,但有上限
)
return MismatchScore(
judgment_id=judgment.judgment_id,
persona_consensus_direction=persona_direction,
persona_consensus_confidence=persona_confidence,
sentiment_direction=sentiment_direction,
sentiment_intensity=sentiment_intensity,
mismatch_raw=mismatch_raw,
mismatch_duration=duration,
mismatch_sustained=mismatch_sustained,
quadrant=quadrant,
signal_strength=signal_strength,
)
def classify_quadrant(
persona_dir: float,
sentiment_dir: float
) -> MismatchQuadrant:
if persona_dir > 0 and sentiment_dir < 0:
return MismatchQuadrant.alpha_long
elif persona_dir < 0 and sentiment_dir > 0:
return MismatchQuadrant.alpha_short
elif persona_dir > 0 and sentiment_dir > 0:
return MismatchQuadrant.beta_long
else:
return MismatchQuadrant.beta_short5.2 信号过滤
不是所有错配都值得行动:
python
SIGNAL_THRESHOLDS = {
"min_mismatch_raw": 0.3, # 错配度阈值
"min_confidence": 0.5, # 智库最低确信度
"min_duration_hours": 24, # 最短持续时间
"min_signal_strength": 0.5, # 最低信号强度
}
async def should_signal(mismatch: MismatchScore) -> bool:
return (
abs(mismatch.mismatch_raw) >= SIGNAL_THRESHOLDS["min_mismatch_raw"] and
mismatch.persona_consensus_confidence >= SIGNAL_THRESHOLDS["min_confidence"] and
mismatch.mismatch_duration.total_hours >= SIGNAL_THRESHOLDS["min_duration_hours"] and
mismatch.signal_strength >= SIGNAL_THRESHOLDS["min_signal_strength"]
)引擎 6:记忆系统 (Memory System)
职责
横贯所有引擎的基础设施:持久化、查询、快照、元学习。
6.1 事件持久化
python
class EventStore:
"""事件存储——所有引擎的事件都经过这里"""
async def append(self, event: Event):
"""追加事件(不可变)"""
await db.execute(
"INSERT INTO events (event_id, event_type, timestamp, source_engine, payload) "
"VALUES ($1, $2, $3, $4, $5)",
event.event_id, event.event_type, event.timestamp,
event.source_engine, json.dumps(event.payload)
)
async def get_events(
self,
event_type: str | None = None,
source_engine: str | None = None,
since: datetime | None = None,
limit: int = 100
) -> list[Event]:
"""查询事件"""
...6.2 快照生成
python
class SnapshotManager:
"""定期生成快照,避免查询时重放全部事件"""
SCHEDULE = {
"narrative_snapshots": "every_6_hours",
"indicator_snapshots": "every_1_hour",
"sentiment_snapshots": "every_1_hour",
}
async def generate_narrative_snapshot(self, narrative_id: str):
"""从事件流重建叙事当前状态"""
events = await event_store.get_events(
event_type__startswith="narrative.",
payload__contains={"narrative_id": narrative_id}
)
# 重放事件流,得到当前状态
state = replay_events(events)
await db.upsert("narrative_snapshots", state)6.3 元学习
python
class MetaLearner:
"""周期性分析系统表现,输出优化建议"""
async def run_weekly_review(self):
"""每周回顾"""
# 1. 获取本周期所有已验证的判断
validated = await judgment_store.get_validated(
since=datetime.now() - timedelta(days=7)
)
# 2. 分析各 Persona 表现
persona_accuracy = {}
for j in validated:
for persona_id, correct in j.validation.persona_accuracy.items():
persona_accuracy.setdefault(persona_id, []).append(correct)
# 3. 分析在什么 regime 下表现好/差
regime_analysis = self.analyze_by_regime(validated)
# 4. 分析叙事类型的误判模式
narrative_errors = self.analyze_narrative_errors(validated)
# 5. 生成优化建议(不自动执行)
proposals = []
for persona_id, results in persona_accuracy.items():
accuracy = sum(results) / len(results)
if accuracy < 0.4:
proposals.append(PersonaEvolutionProposal(
persona_id=persona_id,
identified_weakness=f"本周期准确率 {accuracy:.0%}",
evidence=self.collect_evidence(persona_id, validated),
proposed_changes=self.suggest_changes(persona_id, regime_analysis),
))
# 6. 发布事件,提交人工审批
for p in proposals:
await event_bus.publish(Event(
type="persona.evolution_proposed",
payload=p.dict()
))
def analyze_by_regime(self, judgments) -> dict:
"""分析各 Persona 在不同 regime 下的表现"""
...
def analyze_narrative_errors(self, judgments) -> list:
"""分析叙事类型的误判模式"""
...6.4 事后验证
python
class ValidationEngine:
"""判断事后验证"""
async def validate_judgment(self, judgment: Judgment, lookforward_days: int = 7):
"""判断生成后 N 天,验证准确性"""
# 获取判断时和验证时的指标
then_indicators = await data_store.get_snapshot(judgment.indicator_snapshot_id)
now_indicators = await data_store.get_latest_indicators()
# 判断实际 regime
actual_regime = self.infer_actual_regime(then_indicators, now_indicators)
# 检查各 Persona 是否正确
persona_accuracy = {}
for pj in judgment.persona_judgments:
persona_accuracy[pj.persona_id] = (pj.regime == actual_regime)
# 生成验证记录
validation = JudgmentValidation(
validated_at=datetime.now(),
actual_outcome=f"实际 regime: {actual_regime}",
regime_was_correct=(judgment.synthesis.regime == actual_regime),
persona_accuracy=persona_accuracy,
lessons_learned=await self.generate_lessons(judgment, actual_regime),
)
# 更新判断记录
await judgment_store.attach_validation(judgment.judgment_id, validation)
# 更新 Persona 表现统计
for persona_id, correct in persona_accuracy.items():
await persona_store.update_performance(persona_id, correct)引擎间依赖与执行顺序
独立运行(无依赖):
② 数据采集 ←── 定时
③ 情绪采集 ←── 定时
依赖叙事事件:
① 叙事发现 ←── 新闻输入
依赖 ①②③ 的输出:
④ 智库引擎 ←── 需要叙事 + 指标 + 情绪快照
依赖 ④ 的输出:
⑤ 错配检测 ←── 需要判断结果
横贯所有:
⑥ 记忆系统 ←── 被动接收所有事件