Skip to content

六大引擎详细设计

每个引擎的算法逻辑、处理流程、边界条件和异常处理。

本文档聚焦引擎内部设计。引擎间的通信协议见 03-interface-contract.md


引擎 1:叙事发现引擎 (Narrative Discovery)

职责

从新闻中自动提取叙事命题,追踪演化,发现新兴叙事。

处理流程

新闻输入 → 叙事提取 → 语义匹配 → 新旧判定 → 状态更新/新建
   │           │           │           │
   ▼           ▼           ▼           ▼
 raw text   structured   find       create or
            proposition  similar    update

1.1 叙事提取

输入:原始新闻文本(标题 + 正文 + 元数据)

处理:LLM 提取结构化叙事命题

python
EXTRACTION_PROMPT = """
你是一个宏观经济叙事分析师。从以下新闻中提取核心叙事命题。

新闻标题:{title}
新闻正文:{content}
发布时间:{published_at}
来源:{outlet}

请提取:
1. 核心叙事主题(一句话概括)
2. 叙事描述(2-3句话详细说明)
3. 叙事情绪(鸽/鹰、多/空、乐观/悲观)
4. 叙事强度(这条新闻对叙事的支撑强度 0-1)
5. 关键词标签(3-5个)
6. 涉及的资产类别

输出 JSON 格式,字段:theme, description, sentiment, strength, tags, asset_classes
"""

输出约束

  • 必须输出合法 JSON(代码校验)
  • sentiment ∈ [-1, 1]
  • strength ∈ [0, 1]
  • tags 数量 3-5 个

1.2 语义匹配(新旧对比)

新叙事命题提取后,与已有叙事做语义匹配:

python
async def match_narrative(new_proposition: NarrativeProposition) -> MatchResult:
    # 1. 生成 embedding
    embedding = await embedding_provider.embed_text(
        f"{new_proposition.theme} {new_proposition.description}"
    )

    # 2. 在已有叙事中搜索最相似的
    similar = await narrative_store.search_similar(embedding, top_k=5)

    # 3. 判定
    if not similar:
        return MatchResult(action="create_candidate")

    best_match = similar[0]

    if best_match.similarity > 0.85:
        # 高相似度 → 更新现有叙事
        return MatchResult(
            action="update",
            target_id=best_match.narrative_id,
            changes=compute_changes(best_match, new_proposition)
        )
    elif best_match.similarity > 0.6:
        # 中等相似度 → 可能是子叙事或相关叙事
        return MatchResult(
            action="create_sub",
            parent_id=best_match.narrative_id,
            relation_type="intersects"
        )
    else:
        # 低相似度 → 候选新叙事
        return MatchResult(action="create_candidate")

1.3 叙事状态机转换

python
# 状态转换规则
TRANSITIONS = {
    "candidate": {
        "to_active": "多源确认(>=3个独立来源)且 strength > 0.3",
        "to_rejected": "48小时内无新来源确认",
    },
    "active": {
        "to_fading": "strength_ewma < 0.2 且 trend == weakening",
    },
    "fading": {
        "to_archived": "strength_ewma < 0.1 或 7天无更新",
        "to_active": "新来源使 strength 回升 > 0.4",
    },
}

1.4 叙事合并/分裂检测

python
async def check_merge_split():
    """定期检查(每6小时)"""

    # 合并检测:多个弱叙事高度相关且同时强化
    weak = await get_narratives(max_strength=0.3, trend="strengthening")
    clusters = find_semantic_clusters(weak, threshold=0.8)
    for cluster in clusters:
        if len(cluster) >= 2:
            await event_bus.publish(Event(
                type="narrative.merge_proposed",
                payload={"narrative_ids": [n.id for n in cluster]}
            ))  # 提交人工确认

    # 分裂检测:一个叙事内部来源情绪分化
    strong = await get_narratives(min_strength=0.7)
    for n in strong:
        sentiments = [s.sentiment for s in n.sources]
        if np.std(sentiments) > 0.4:
            await event_bus.publish(Event(
                type="narrative.split_proposed",
                payload={"narrative_id": n.id, "sentiment_variance": np.std(sentiments)}
            ))  # 提交人工确认

边界条件

- 空新闻/噪音新闻 → LLM 返回空结果,跳过
- LLM 输出格式错误 → 重试 2 次,仍失败则记录日志跳过
- 重复新闻(相同 URL)→ 幂等去重
- 非宏观新闻(体育、娱乐)→ LLM 标签过滤

引擎 2:数据采集引擎 (Data Collection)

职责

从多个 API 获取市场数据,计算宏观指标,标记异常。

2.1 数据源接入架构

python
class DataProvider(Protocol):
    """数据源接口——所有数据源实现此接口"""
    name: DataSource
    async def fetch(self, request: DataRequest) -> DataResponse: ...
    async def health_check(self) -> bool: ...

# 实现
class OpenBBProvider(DataProvider): ...    # 统一层
class IBKRProvider(DataProvider): ...     # 实时补丁
class LongbridgeProvider(DataProvider): ...  # 中国市场
class BinanceProvider(DataProvider): ...  # 加密永续

class DataCollector:
    """数据采集引擎核心"""
    providers: list[DataProvider]

    async def collect_all(self) -> IndicatorSnapshot:
        results = await asyncio.gather(
            *[p.fetch(request) for p in self.providers],
            return_exceptions=True
        )
        # 合并结果,降级处理
        return self.merge_results(results)

2.2 指标计算逻辑

python
class IndicatorCalculator:
    """指标计算——纯确定性代码,不涉及 LLM"""

    def calc_yield_curve_slope(self, rates: dict) -> IndicatorValue:
        """收益率曲线斜率"""
        value = rates["10Y"] - rates["2Y"]
        return IndicatorValue(
            name="yield_curve_2s10s",
            category=IndicatorCategory.rates,
            value=value,
            unit="percent",
            z_score=self.calc_z_score("yield_curve_2s10s", value),
            percentile=self.calc_percentile("yield_curve_2s10s", value),
            trend=self.calc_trend("yield_curve_2s10s"),
            is_anomaly=abs(self.calc_z_score("yield_curve_2s10s", value)) > 2,
        )

    def calc_spread(self, series_a: str, series_b: str) -> IndicatorValue:
        """利差计算"""
        ...

    def calc_funding_rate_stats(self, rates: list[float]) -> IndicatorValue:
        """资金费率统计"""
        ...

    def calc_z_score(self, indicator_name: str, current: float) -> float:
        """Z-score:当前值相对于历史的位置"""
        history = self.get_history(indicator_name, days=252)  # 一年
        mean = np.mean(history)
        std = np.std(history)
        return (current - mean) / std if std > 0 else 0.0

    def calc_percentile(self, indicator_name: str, current: float) -> float:
        """历史百分位"""
        history = self.get_history(indicator_name, days=252)
        return sum(1 for v in history if v < current) / len(history)

    def calc_trend(self, indicator_name: str) -> TrendDirection:
        """趋势判断(线性回归斜率)"""
        recent = self.get_history(indicator_name, days=20)
        slope = np.polyfit(range(len(recent)), recent, 1)[0]
        if slope > 0.01:
            return TrendDirection.rising
        elif slope < -0.01:
            return TrendDirection.falling
        else:
            return TrendDirection.stable

2.3 采集调度

python
SCHEDULE = {
    # 高频(实时层)
    "funding_rate":     {"interval": "1min",  "provider": "binance"},
    "orderbook_depth":  {"interval": "5min",  "provider": "binance"},

    # 中频(日频层)
    "yield_curve":      {"interval": "1hour", "provider": "openbb"},
    "credit_spreads":   {"interval": "1hour", "provider": "openbb"},
    "equity_indices":   {"interval": "5min",  "provider": "ibkr"},
    "fx_rates":         {"interval": "5min",  "provider": "ibkr"},

    # 低频
    "economic_calendar":{"interval": "1day",  "provider": "openbb"},
    "cftc_cot":         {"interval": "1week", "provider": "openbb"},
    "northbound_flow":  {"interval": "1day",  "provider": "longbridge"},
}

边界条件

- 数据源超时 → 跳过该源,标记 degraded,使用上次数据
- 数据格式异常 → 记录日志,跳过该条目
- 全部数据源故障 → 发布告警事件,保持上次快照
- 数据源限流 → 降低采集频率,记录限流状态

引擎 3:市场情绪引擎 (Market Sentiment)

职责

采集散户情绪、公众舆论、机构行为,输出情绪快照。

3.1 散户情绪采集

python
class RetailSentimentCollector:
    async def collect(self) -> RetailSentiment:
        results = await asyncio.gather(
            self.get_fear_greed(),      # alternative.me
            self.get_put_call_ratio(),  # IBKR / CBOE
            self.get_margin_balance(),  # Longbridge
            self.get_social_sentiment(),# 雪球/Twitter NLP
            self.get_search_trend(),    # 百度指数/Google Trends
        )
        return RetailSentiment(*results)

    async def get_social_sentiment(self) -> float:
        """社交媒体情绪——LLM 批量打分"""
        posts = await fetch_recent_posts(limit=100)
        # 批量送 LLM 打分,返回平均值
        scores = await llm_batch_score(posts, task="sentiment")
        return np.mean(scores)

3.2 媒体情绪采集

python
class MediaSentimentCollector:
    async def collect(self) -> MediaSentiment:
        # 新闻标题情绪(LLM 打分)
        headlines = await fetch_recent_headlines(hours=24)
        news_sentiment = await llm_batch_score(headlines, task="sentiment")

        # 央行声明鸽鹰度(LLM 分析)
        statements = await fetch_central_bank_statements()
        cb_tone = await llm_analyze_central_bank_tone(statements)

        # KOL 观点聚合
        kol_posts = await fetch_kol_posts()
        kol_consensus = await llm_batch_score(kol_posts, task="sentiment")

        return MediaSentiment(
            news_sentiment=np.mean(news_sentiment),
            news_volume=len(headlines),
            central_bank_tone=cb_tone,
            kol_consensus=np.mean(kol_consensus),
        )

3.3 机构行为采集

python
class InstitutionalSentimentCollector:
    async def collect(self) -> InstitutionalSentiment:
        return InstitutionalSentiment(
            fund_cash_level=await self.get_fund_cash_level(),      # BofA Survey
            etf_flow_direction=await self.get_etf_flow(),          # ETF.com
            cftc_net_position=await self.get_cftc_position(),      # CFTC
            ipo_activity=await self.get_ipo_activity(),            # SEC filing
        )

3.4 极端情绪检测

python
EXTREME_THRESHOLDS = {
    "fear_greed_index": {"low": 15, "high": 85},
    "put_call_ratio": {"low": 0.6, "high": 1.2},
    "social_sentiment": {"low": -0.7, "high": 0.7},
}

async def check_extreme(snapshot: SentimentSnapshot) -> list[str]:
    """检测极端情绪,返回极端字段列表"""
    extremes = []
    for field, thresholds in EXTREME_THRESHOLDS.items():
        value = get_nested(snapshot, field)
        if value < thresholds["low"] or value > thresholds["high"]:
            extremes.append(field)
    return extremes

引擎 4:智库引擎 (Think Tank / Persona Agents)

职责

基于蒸馏 Persona,以蒙特卡罗对抗辩论的形式输出理性分析。

4.1 分析触发条件

python
TRIGGER_CONDITIONS = [
    # 事件驱动
    {"event": "narrative.created", "min_strength": 0.4},
    {"event": "narrative.updated", "min_strength_change": 0.2},
    {"event": "indicators.anomaly"},
    {"event": "sentiment.extreme"},

    # 定时驱动
    {"schedule": "daily", "time": "08:00 UTC"},   # 每日一次完整分析
]

4.2 Persona 独立分析

python
async def persona_analyze(
    persona: PersonaConfig,
    context: AnalysisContext
) -> PersonaJudgment:
    """单个 Persona 的独立分析"""

    prompt = f"""
    {persona.analysis_framework}

    ---

    当前叙事状态:
    {format_narratives(context.narratives)}

    当前市场指标:
    {format_indicators(context.indicators)}

    当前市场情绪:
    {format_sentiment(context.sentiment)}

    历史判断参考:
    {format_history(context.recent_judgments)}

    请按照 {persona.scoring_rubric} 的标准,给出你的独立判断。

    输出 JSON 格式:
    - regime: MacroRegime 枚举
    - regime_confidence: [0, 1]
    - narrative_assessment: string(你的叙事分析)
    - narrative_belief: [-1, 1]
    - scenarios: list[Scenario]
    - reasoning_chain: string(完整推理过程)
    """

    response = await llm.generate(
        prompt,
        model=ANALYSIS_MODEL,      # Claude Sonnet / GPT-4o
        temperature=0.1,           # 低温度,减少随机性
        response_format={"type": "json_object"}
    )

    return PersonaJudgment(
        persona_id=persona.id,
        persona_version=persona.current_version,
        **validate_json(response, PersonaJudgmentSchema)
    )

4.3 对抗辩论

python
async def run_debate(
    judgments: list[PersonaJudgment],
    context: AnalysisContext
) -> list[DebateRound]:
    """多轮对抗辩论"""

    rounds = []

    for round_num in range(1, MAX_DEBATE_ROUNDS + 1):
        challenges = []

        for challenger in judgments:
            for target in judgments:
                if challenger.persona_id == target.persona_id:
                    continue

                # 检查是否有实质性分歧
                if not has_significant_disagreement(challenger, target):
                    continue

                # 生成质疑
                challenge_prompt = f"""
                你是 {challenger.persona_id},你的判断是:
                Regime: {challenger.regime} (confidence: {challenger.regime_confidence})
                叙事看法: {challenger.narrative_assessment}

                你正在质疑 {target.persona_id},他的判断是:
                Regime: {target.regime} (confidence: {target.regime_confidence})
                叙事看法: {target.narrative_assessment}

                当前数据:
                {format_indicators(context.indicators)}

                请提出你的质疑(具体、有数据支撑、指向对方推理的薄弱环节)。
                然后模拟 {target.persona_id} 的回应。

                输出 JSON:{{"challenge": "...", "response": "..."}}
                """

                result = await llm.generate(challenge_prompt, temperature=0.2)
                challenges.append(DebateChallenge(
                    challenger_id=challenger.persona_id,
                    target_id=target.persona_id,
                    **validate_json(result, DebateChallengeSchema)
                ))

        rounds.append(DebateRound(round_number=round_num, challenges=challenges))

        # 如果没有实质性分歧,提前结束
        if not challenges:
            break

    return rounds

4.4 Ensemble 合成

python
async def synthesize(
    judgments: list[PersonaJudgment],
    persona_weights: dict[str, float],
    debate_rounds: list[DebateRound]
) -> JudgmentSynthesis:
    """加权合成最终判断"""

    # Regime 投票(加权)
    regime_votes: dict[MacroRegime, float] = defaultdict(float)
    for j in judgments:
        weight = persona_weights[j.persona_id]
        regime_votes[j.regime] += weight * j.regime_confidence

    final_regime = max(regime_votes, key=regime_votes.get)
    final_confidence = regime_votes[final_regime] / sum(regime_votes.values())

    # 叙事信念加权平均
    narrative_belief = sum(
        j.narrative_belief * persona_weights[j.persona_id]
        for j in judgments
    ) / sum(persona_weights[j.persona_id] for j in judgments)

    # 共识度计算
    consensus = calculate_consensus(judgments)

    # 分歧点提取
    disagreements = extract_disagreements(judgments, debate_rounds)

    return JudgmentSynthesis(
        regime=final_regime,
        regime_confidence=final_confidence,
        narrative_data_alignment=assess_alignment(narrative_belief, context.indicators),
        key_tension=identify_tension(judgments, context),
        scenarios=merge_scenarios(judgments),
        persona_consensus_level=consensus,
        key_disagreements=disagreements,
        invalidation_conditions=extract_invalidation(judgments),
    )

边界条件

- 单个 Persona LLM 调用超时 → 跳过该 Persona,降低 ensemble 置信度
- 所有 Persona 超时 → 记录错误,不输出判断
- 辩论陷入循环 → 最多 3 轮,强制结束
- 所有 Persona 高度一致 → 触发"反共识"检查(是否有被忽略的风险)

引擎 5:错配检测引擎 (Mismatch Detection)

职责

将智库引擎的理性分析与市场情绪进行对比,分类 Alpha 和 Beta 信号。

5.1 计算逻辑

python
async def detect_mismatch(judgment: Judgment) -> MismatchScore:
    """错配检测——纯确定性计算"""

    sentiment = await sentiment_store.get_latest_sentiment()

    # 1. 智库共识方向 [-1, 1]
    persona_direction = judgment.synthesis.regime_to_direction()
    # expansion/recovery → +1, recession/deflation → -1, stagflation/late_cycle → 0 ± 0.3
    persona_confidence = judgment.synthesis.regime_confidence

    # 2. 市场情绪方向 [-1, 1]
    sentiment_direction = (
        normalize(sentiment.retail.fear_greed_index, 0, 100, -1, 1) * 0.3 +
        normalize(sentiment.media.news_sentiment, -1, 1, -1, 1) * 0.3 +
        normalize(sentiment.retail.social_sentiment, -1, 1, -1, 1) * 0.2 +
        normalize(sentiment.institutional.etf_flow_direction, -1, 1, -1, 1) * 0.2
    )
    sentiment_intensity = abs(sentiment_direction)

    # 3. 错配计算
    mismatch_raw = persona_direction - sentiment_direction

    # 4. 持续修正(需查询历史错配)
    duration = await calc_mismatch_duration(judgment.narrative_snapshot_id, mismatch_raw)
    mismatch_sustained = mismatch_raw * duration.total_hours

    # 5. 分类
    quadrant = classify_quadrant(persona_direction, sentiment_direction)

    # 6. 信号强度
    signal_strength = min(1.0,
        abs(mismatch_raw) * 0.4 +
        persona_confidence * 0.3 +
        min(duration.total_days / 7, 1.0) * 0.3  # 持续越久信号越强,但有上限
    )

    return MismatchScore(
        judgment_id=judgment.judgment_id,
        persona_consensus_direction=persona_direction,
        persona_consensus_confidence=persona_confidence,
        sentiment_direction=sentiment_direction,
        sentiment_intensity=sentiment_intensity,
        mismatch_raw=mismatch_raw,
        mismatch_duration=duration,
        mismatch_sustained=mismatch_sustained,
        quadrant=quadrant,
        signal_strength=signal_strength,
    )

def classify_quadrant(
    persona_dir: float,
    sentiment_dir: float
) -> MismatchQuadrant:
    if persona_dir > 0 and sentiment_dir < 0:
        return MismatchQuadrant.alpha_long
    elif persona_dir < 0 and sentiment_dir > 0:
        return MismatchQuadrant.alpha_short
    elif persona_dir > 0 and sentiment_dir > 0:
        return MismatchQuadrant.beta_long
    else:
        return MismatchQuadrant.beta_short

5.2 信号过滤

不是所有错配都值得行动:

python
SIGNAL_THRESHOLDS = {
    "min_mismatch_raw": 0.3,        # 错配度阈值
    "min_confidence": 0.5,           # 智库最低确信度
    "min_duration_hours": 24,        # 最短持续时间
    "min_signal_strength": 0.5,      # 最低信号强度
}

async def should_signal(mismatch: MismatchScore) -> bool:
    return (
        abs(mismatch.mismatch_raw) >= SIGNAL_THRESHOLDS["min_mismatch_raw"] and
        mismatch.persona_consensus_confidence >= SIGNAL_THRESHOLDS["min_confidence"] and
        mismatch.mismatch_duration.total_hours >= SIGNAL_THRESHOLDS["min_duration_hours"] and
        mismatch.signal_strength >= SIGNAL_THRESHOLDS["min_signal_strength"]
    )

引擎 6:记忆系统 (Memory System)

职责

横贯所有引擎的基础设施:持久化、查询、快照、元学习。

6.1 事件持久化

python
class EventStore:
    """事件存储——所有引擎的事件都经过这里"""

    async def append(self, event: Event):
        """追加事件(不可变)"""
        await db.execute(
            "INSERT INTO events (event_id, event_type, timestamp, source_engine, payload) "
            "VALUES ($1, $2, $3, $4, $5)",
            event.event_id, event.event_type, event.timestamp,
            event.source_engine, json.dumps(event.payload)
        )

    async def get_events(
        self,
        event_type: str | None = None,
        source_engine: str | None = None,
        since: datetime | None = None,
        limit: int = 100
    ) -> list[Event]:
        """查询事件"""
        ...

6.2 快照生成

python
class SnapshotManager:
    """定期生成快照,避免查询时重放全部事件"""

    SCHEDULE = {
        "narrative_snapshots": "every_6_hours",
        "indicator_snapshots": "every_1_hour",
        "sentiment_snapshots": "every_1_hour",
    }

    async def generate_narrative_snapshot(self, narrative_id: str):
        """从事件流重建叙事当前状态"""
        events = await event_store.get_events(
            event_type__startswith="narrative.",
            payload__contains={"narrative_id": narrative_id}
        )
        # 重放事件流,得到当前状态
        state = replay_events(events)
        await db.upsert("narrative_snapshots", state)

6.3 元学习

python
class MetaLearner:
    """周期性分析系统表现,输出优化建议"""

    async def run_weekly_review(self):
        """每周回顾"""

        # 1. 获取本周期所有已验证的判断
        validated = await judgment_store.get_validated(
            since=datetime.now() - timedelta(days=7)
        )

        # 2. 分析各 Persona 表现
        persona_accuracy = {}
        for j in validated:
            for persona_id, correct in j.validation.persona_accuracy.items():
                persona_accuracy.setdefault(persona_id, []).append(correct)

        # 3. 分析在什么 regime 下表现好/差
        regime_analysis = self.analyze_by_regime(validated)

        # 4. 分析叙事类型的误判模式
        narrative_errors = self.analyze_narrative_errors(validated)

        # 5. 生成优化建议(不自动执行)
        proposals = []
        for persona_id, results in persona_accuracy.items():
            accuracy = sum(results) / len(results)
            if accuracy < 0.4:
                proposals.append(PersonaEvolutionProposal(
                    persona_id=persona_id,
                    identified_weakness=f"本周期准确率 {accuracy:.0%}",
                    evidence=self.collect_evidence(persona_id, validated),
                    proposed_changes=self.suggest_changes(persona_id, regime_analysis),
                ))

        # 6. 发布事件,提交人工审批
        for p in proposals:
            await event_bus.publish(Event(
                type="persona.evolution_proposed",
                payload=p.dict()
            ))

    def analyze_by_regime(self, judgments) -> dict:
        """分析各 Persona 在不同 regime 下的表现"""
        ...

    def analyze_narrative_errors(self, judgments) -> list:
        """分析叙事类型的误判模式"""
        ...

6.4 事后验证

python
class ValidationEngine:
    """判断事后验证"""

    async def validate_judgment(self, judgment: Judgment, lookforward_days: int = 7):
        """判断生成后 N 天,验证准确性"""

        # 获取判断时和验证时的指标
        then_indicators = await data_store.get_snapshot(judgment.indicator_snapshot_id)
        now_indicators = await data_store.get_latest_indicators()

        # 判断实际 regime
        actual_regime = self.infer_actual_regime(then_indicators, now_indicators)

        # 检查各 Persona 是否正确
        persona_accuracy = {}
        for pj in judgment.persona_judgments:
            persona_accuracy[pj.persona_id] = (pj.regime == actual_regime)

        # 生成验证记录
        validation = JudgmentValidation(
            validated_at=datetime.now(),
            actual_outcome=f"实际 regime: {actual_regime}",
            regime_was_correct=(judgment.synthesis.regime == actual_regime),
            persona_accuracy=persona_accuracy,
            lessons_learned=await self.generate_lessons(judgment, actual_regime),
        )

        # 更新判断记录
        await judgment_store.attach_validation(judgment.judgment_id, validation)

        # 更新 Persona 表现统计
        for persona_id, correct in persona_accuracy.items():
            await persona_store.update_performance(persona_id, correct)

引擎间依赖与执行顺序

独立运行(无依赖):
  ② 数据采集  ←── 定时
  ③ 情绪采集  ←── 定时

依赖叙事事件:
  ① 叙事发现  ←── 新闻输入

依赖 ①②③ 的输出:
  ④ 智库引擎  ←── 需要叙事 + 指标 + 情绪快照

依赖 ④ 的输出:
  ⑤ 错配检测  ←── 需要判断结果

横贯所有:
  ⑥ 记忆系统  ←── 被动接收所有事件

关联文档:规范总览 · 模块间接口规范 · 实现阶段与验收标准

基于 VitePress 构建