From ef6fbc152189ba81cdf6275508dc1e3ab64f33a6 Mon Sep 17 00:00:00 2001 From: root <953994191@qq.com> Date: Wed, 6 May 2026 04:26:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A8=E7=90=86=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 131 +++++++++++++++++++ backend/app/agent/prompts.py | 17 +-- backend/app/core/intent.py | 71 +++++++--- backend/app/main_graph/main_graph_builder.py | 17 +-- backend/app/main_graph/nodes/fast_paths.py | 3 +- backend/app/main_graph/nodes/llm_call.py | 45 +++---- backend/app/main_graph/nodes/rag_nodes.py | 95 ++++++++------ backend/app/main_graph/nodes/reasoning.py | 2 + backend/app/main_graph/nodes/routing.py | 4 +- backend/app/rag/pipeline.py | 31 ++++- tools/start.py | 2 +- tools/test/test_graph_branches.py | 24 ++-- 12 files changed, 313 insertions(+), 129 deletions(-) diff --git a/README.md b/README.md index 488a354..068bf1f 100644 --- a/README.md +++ b/README.md @@ -2043,3 +2043,134 @@ curl http://115.190.121.151:6333/collections ## 🤝 贡献 欢迎提交 Issue 和 Pull Request! + +--- +### TODO + +## 1.llm的BUG修复 +看了你的代码,目前的核心问题在于**推理节点和回答节点的职责虽然分开了,但实现上仍有混淆**,导致流程混乱。下面帮你梳理清楚。 + +--- + +## 当前架构分析 + +### ✅ 正确的部分 +- **`llm_call` 节点**已经使用了**无工具模型**(`models[model_name]`,没有 `bind_tools`),只负责根据上下文生成最终回答。这是对的。 +- **`AIAgentService`** 中,`process_message_stream` 处理了 `tool_calls` 事件,说明你预料到推理 LLM 可能产生 tool call。 + +### ❌ 混乱的根源 +- **推理 LLM 很可能仍然绑定了工具**。 + 在 `build_react_main_graph(chat_services=self.chat_services, tools=self.tools, ...)` 中,你很可能在构建推理节点时,将 `tools` 传给了推理 LLM(`llm.bind_tools(tools)`)。这会导致: + 1. 推理 LLM 直接输出 OpenAI 格式的 function call,而不是你期望的 `{"action": "RETRIEVE_RAG"}` 文本。 + 2. LangGraph 会自动解析这些 tool call 并执行(如果工具节点被正确连接),这就绕过了你自己的路由逻辑(条件边)。 + 3. 你的流式处理代码(`_handle_message_chunk`)收到了 `tool_calls`,但只是把它们当作事件转发给前端,并没有真正由你的代码去执行工具,导致工具调用可能由 LangGraph 自动完成,或者完全丢失。 +- **流式处理中的 `_handle_message_chunk`** 处理了 `tool_calls`,但实际上这些 tool call 如果被 LangGraph 自动执行了,前端会看到工具调用,但之后突然就跳到了回答,中间缺乏透明控制。如果推理 LLM 输出的是文本动作(比如 JSON),则 `tool_calls` 永远不会出现,这段处理逻辑就成了死代码。 + +--- + +## 应该实现的目标架构(也是你一贯讨论的) +``` +推理节点 (react_reason) + ├─ 使用不带工具绑定的 LLM + ├─ Prompt 中列出可用动作:RETRIEVE_RAG, WEB_SEARCH, DIRECT_RESPONSE + ├─ 输出结构化 JSON:{"action": "...", "query": "...", "reasoning": "..."} + └─ 你的代码解析 JSON → 修改 state.next_action + +条件边 (route_by_reasoning) + ├─ 读取 state.next_action + ├─ 映射到工具节点:rag_retrieve, web_search_node + └─ 或直接进入 llm_call 节点 + +工具节点 (rag_retrieve / web_search_node) + ├─ 执行实际检索/搜索 + └─ 将结果写回 state.rag_context, state.retrieved_docs + +回答节点 (llm_call) + ├─ 使用无工具 LLM + ├─ 基于 state.rag_context 生成最终回答 + └─ 绝不调用任何工具 +``` + +--- + +## 需要现在修改的地方(按优先级) + +### 1. 推理节点:移除工具绑定,改为文本决策 +在 `build_react_main_graph` 中(或在推理节点的构造代码里),确保推理 LLM 是通过 `prompt | llm` 而不是 `prompt | llm.bind_tools(tools)` 调用的。 +**做法**: +- 推理节点的 system prompt 里列出可用动作及格式要求。 +- 推理 LLM 只输出 JSON,例如: + ```json + {"action": "RETRIEVE_RAG", "search_query": "吕布 事迹 三国演义"} + ``` +- 在推理节点后增加一个解析函数,将 JSON 解析为具体的 `action` 和参数,更新到 state 中。 + +### 2. 条件边:根据解析的 `action` 干净路由 +确保 `route_by_reasoning` 使用解析后的 `state.next_action`(字符串)进行路由,而不是再去检查 `history` 或 `retrieved_docs`。同时将**置信度阈值判断**放在路由之前或放在推理节点的 prompt 中(让 LLM 决策时就遵守规则),避免在条件边里重复判断。 + +### 3. llm_call 节点:保持当前的无工具状态 ✅ +当前已正确,无需改动。唯一建议:确保 `models[model_name]` 确实没有绑定工具。 + +### 4. 流式处理:简化事件处理 +如果推理节点不再输出 tool call,可以将 `_handle_message_chunk` 中的 tool_call 处理分支删除或注释,避免混淆。 +未来如果你想展示推理过程,可以发送 custom event(如你现在做的 react_reasoning 事件)。 + +--- + +## 你现在应该做的具体步骤 +1. **检查 `build_react_main_graph` 函数**,找到推理节点的创建代码,确认是否调用了 `llm.bind_tools()`。如果调用了,改为 `llm`,并更新 prompt 为 JSON 输出。 +2. **确保推理节点的 prompt 包含以下内容**: + - 当前状态(RAG 置信度、尝试次数、已有的检索结果摘要) + - 决策规则(置信度阈值、最大重试次数等) + - 要求输出纯 JSON(不要代码块标记),格式为 `{"action": "...", "args": {...}}` +3. **在推理节点返回后,添加一个解析函数**,提取 `action` 和参数,设置 `state.next_action`、`state.rag_query` 等字段。 +4. **修改条件边**,直接根据 `state.next_action` 跳转。 +5. **测试**:运行“吕布的事迹?”查询,应该看到推理节点输出 `RETRIEVE_RAG`,然后 `rag_retrieve` 执行,再次推理(如果置信度低),或者直接 `DIRECT_RESPONSE` → `llm_call` 生成回答。最终回答应基于检索到的吕布相关文本,而不是无关片段。 + +--- + +## 总结 +**当前逻辑有问题**,主要是因为推理节点可能仍绑定了工具,导致 tool call 自动执行,打乱了你的路由控制。按上述方案调整后,Agent 的决策和执行会变得透明、可控,职责分明。如果你需要,我可以帮你重写推理节点的核心逻辑。 + +## 2.优化:实现推理验证 +1. 在 React 循环中增加“验证”步骤 +在推理 LLM 输出 DIRECT_RESPONSE 后,不直接返回给用户,而是先进入一个 validate_answer 节点: + +text +推理节点 → DIRECT_RESPONSE → validate_answer → 合格?→ 返回用户 + ↓ 不合格 + 重新规划动作(如重新检索) +验证内容:检查回答是否自洽、引用依据是否充分、是否回答了用户问题等。 + +2. 使用 LLM 自省(Self-Reflection) +在 validate_answer 节点里调用一个专门的校验 LLM(可以是轻量模型),给它这样的 prompt: + +text +你是一个严格的校验员。请检查以下回答是否满足要求: + +【用户问题】 +{user_query} + +【检索到的资料】 +{rag_context} + +【生成的回答】 +{llm_response} + +请判断: +1. 回答是否基于给定的资料? +2. 回答是否直接回应了用户问题? +3. 回答是否存在事实错误或逻辑漏洞? + +输出 JSON:{"pass": true/false, "reason": "..."} +如果 pass = false,则退回推理节点重新规划(如重新检索或联网搜索)。 + +3. 在 System Prompt 里要求推理节点评估回答质量 +你可以在推理节点的 prompt 里增加一条规则: + +text +当你决定 DIRECT_RESPONSE 并收到 llm_call 的回答后,必须自我检查: +- 回答是否与检索到的资料一致? +- 是否回答了用户核心问题? +如果发现不一致或遗漏,必须重新规划。 +这相当于把反思逻辑融入了推理循环。 \ No newline at end of file diff --git a/backend/app/agent/prompts.py b/backend/app/agent/prompts.py index 42024c5..cc3a973 100644 --- a/backend/app/agent/prompts.py +++ b/backend/app/agent/prompts.py @@ -1,23 +1,12 @@ # app/prompts.py from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder -def create_system_prompt(tools: list = None) -> ChatPromptTemplate: +def create_system_prompt() -> ChatPromptTemplate: """ 创建系统提示模板,整合多子系统能力、检索策略与回答规范。 """ - # 构造工具描述 - tools_section = "无可用工具" - if tools: - tool_descs = [] - for tool in tools: - name = getattr(tool, 'name', None) or getattr(tool, '__name__', 'unknown_tool') - desc = (tool.description or "").split('\n')[0] - tool_descs.append(f"- {name}: {desc}") - tools_section = "\n".join(tool_descs) - # 使用 f-string 将 tools_section 直接嵌入,而 memory_context 用双花括号转义保留为变量 system_template = f'''你是一个智能助手,具备以下专业子系统和检索能力。请使用中文交流。 - ## 核心功能 1. 📚 词典/翻译子系统 – 查询单词、翻译文本、提取术语、每日一词 2. 📰 资讯分析子系统 – 查询新闻、分析URL、提取关键词、生成报告 @@ -34,10 +23,6 @@ def create_system_prompt(tools: list = None) -> ChatPromptTemplate: - 第3次决定获取信息时,必须选择**联网搜索**,禁止无休止的本地检索。 - 如果已经明确知识库不包含该信息(例如用户询问实时新闻),可以直接进入联网搜索。 -## 可用工具 -{tools_section} -工具调用时请直接返回所需参数,无需额外说明。 - ## 用户背景信息 以下是当前用户的已知信息和长期记忆,你应在回答中优先利用这些信息进行个性化回复: {{memory_context}} diff --git a/backend/app/core/intent.py b/backend/app/core/intent.py index ba1b6f7..e057fb6 100644 --- a/backend/app/core/intent.py +++ b/backend/app/core/intent.py @@ -130,41 +130,60 @@ class ReactIntentReasoner: retrieved_docs = context.get("retrieved_docs", []) messages = context.get("messages", []) - # 关键修改:不要在第一次 rag_retrieve 后就直接回答,允许再推理一次 - # 让推理逻辑有机会判断 RAG 结果好不好,要不要再检索或转 web search + # 获取 RAG 相关状态 previous_actions = context.get("previous_actions", []) - rag_count = previous_actions.count("RETRIEVE_RAG") # 修复:大写 - web_search_count = previous_actions.count("web_search") + rag_count = previous_actions.count("RETRIEVE_RAG") + rag_attempts = context.get("rag_attempts", rag_count) + rag_confidence = context.get("rag_confidence", 0.0) retrieved_docs = context.get("retrieved_docs", []) - - # 如果已经有检索文档了,直接回答 - if retrieved_docs and len(retrieved_docs) > 0: - result.action = ReasoningAction.DIRECT_RESPONSE - result.confidence = 0.95 - result.reasoning = "已获取检索文档,直接回答" - return result - - # 只有当 rag 或 web search 已经超过 1 次,或者已经有推理在 rag 之后,才直接回答 - if rag_count >= 2 or web_search_count >= 1: - result.action = ReasoningAction.DIRECT_RESPONSE - result.confidence = 0.95 - result.reasoning = "已获取足够信息,直接回答" - return result + web_search_count = previous_actions.count("web_search") # 检查 RAG 是否多次失败(reasoning_history 中有失败的 RAG 记录) - # 失败的 RAG 记录特征:confidence = 0.0 rag_history = context.get("reasoning_history", []) rag_fail_count = sum( 1 for h in rag_history if h.get("action") in ("RETRIEVE_RAG", "RE_RETRIEVE_RAG") and h.get("confidence", 1.0) == 0.0 ) + + # 如果有检索文档,根据置信度判断下一步 + if retrieved_docs and len(retrieved_docs) > 0: + if rag_confidence >= 0.6: + # 置信度足够高,直接回答 + result.action = ReasoningAction.DIRECT_RESPONSE + result.confidence = 0.95 + result.reasoning = f"已获取检索文档,置信度={rag_confidence:.2f},直接回答" + return result + elif rag_attempts >= 2 or rag_fail_count >= 2: + # 尝试次数已够或多次失败,放弃 RAG,转向联网搜索 + result.action = ReasoningAction.WEB_SEARCH + result.confidence = 0.8 + result.reasoning = f"RAG 置信度={rag_confidence:.2f} < 0.6,且已尝试 {rag_attempts} 次,转向联网搜索" + result.metadata["need_web_search"] = True + result.metadata["search_query"] = query + return result + else: + # 置信度不够但还有尝试机会,再查一次 + result.action = ReasoningAction.RETRIEVE_RAG + result.confidence = 0.8 + result.reasoning = f"已获取检索文档但置信度={rag_confidence:.2f} < 0.6,可再尝试一次" + result.retrieval_config.need_retrieval = True + result.retrieval_config.retrieval_query = query + return result + + # 如果 RAG 已多次失败且无文档,直接回答(基于常识) if rag_fail_count >= 2: - # RAG 多次失败,应该直接回答而不是继续重试 result.action = ReasoningAction.DIRECT_RESPONSE result.confidence = 0.7 result.reasoning = f"RAG 已尝试 {rag_fail_count} 次均失败,知识库无相关内容,直接基于常识回答" return result + # 如果 web search 已执行过,直接回答 + if web_search_count >= 1: + result.action = ReasoningAction.DIRECT_RESPONSE + result.confidence = 0.95 + result.reasoning = "已获取联网搜索结果,直接回答" + return result + # 策略1:尝试使用 LLM 推理 try: llm_result = await self._reason_with_llm(query, context) @@ -194,6 +213,10 @@ class ReactIntentReasoner: context_parts = [] if context.get("retrieved_docs"): context_parts.append(f"- 已检索文档: {len(context['retrieved_docs'])} 条") + if context.get("rag_confidence") is not None: + context_parts.append(f"- RAG 置信度: {context['rag_confidence']:.2f}") + if context.get("rag_attempts"): + context_parts.append(f"- RAG 尝试次数: {context['rag_attempts']}") if context.get("previous_actions"): context_parts.append(f"- 历史动作: {context['previous_actions']}") @@ -202,7 +225,7 @@ class ReactIntentReasoner: return f"""你是一个专业的意图推理助手。请分析用户的查询,决定下一步应该做什么。 可选动作: -1. DIRECT_RESPONSE - 直接回答(闲聊、打招呼、不需要额外信息) +1. DIRECT_RESPONSE - 直接回答(闲聊、打招呼、不需要额外信息,或已有足够信息) 2. RETRIEVE_RAG - 需要查询知识库(询问知识、政策、文档等) 3. RE_RETRIEVE_RAG - 需要重新检索(之前的结果不够,或者用户明确说"再查查"、"更多") 4. WEB_SEARCH - 需要联网搜索(询问最新资讯、热点、实时信息、知识库中没有的内容) @@ -212,6 +235,12 @@ class ReactIntentReasoner: - news_analysis: 资讯、新闻、热点分析相关 6. CLARIFY - 需要澄清用户的问题(问题不明确) +判断规则: +- 如果 RAG 置信度 >= 0.6 且有检索文档,应返回 DIRECT_RESPONSE +- 如果 RAG 置信度 < 0.6 且尝试次数 < 2,可返回 RETRIEVE_RAG 再试一次 +- 如果 RAG 置信度 < 0.6 且尝试次数 >= 2,应返回 WEB_SEARCH +- 如果已联网搜索过,应返回 DIRECT_RESPONSE + 用户查询: {query} 当前上下文: {context_str} diff --git a/backend/app/main_graph/main_graph_builder.py b/backend/app/main_graph/main_graph_builder.py index 7937908..922ac83 100644 --- a/backend/app/main_graph/main_graph_builder.py +++ b/backend/app/main_graph/main_graph_builder.py @@ -21,7 +21,7 @@ from .nodes.fast_paths import ( fast_tool_node, ) from .nodes.llm_call import create_dynamic_llm_call_node -from .nodes.rag_nodes import rag_retrieve_node, check_rag_confidence +from .nodes.rag_nodes import rag_retrieve_node from .nodes.retrieve_memory import create_retrieve_memory_node from .nodes.memory_trigger import memory_trigger_node, set_mem0_client from .nodes.summarize import create_summarize_node @@ -164,7 +164,7 @@ def _add_routing_edges(graph: StateGraph, use_hybrid_router: bool, llm_node) -> } ) - # 快速路径的完成检查 + # 快速路径的完成检查(fast_rag 失败直接走 react_reason) for fast_node in ["fast_chitchat", "fast_rag", "fast_tool"]: graph.add_conditional_edges( fast_node, @@ -198,17 +198,8 @@ def _add_react_loop_edges(graph: StateGraph, subgraph_nodes: Dict[str, Any]) -> } ) - # RAG 检索后的置信度判断分支 - graph.add_conditional_edges( - "rag_retrieve", - check_rag_confidence, - { - "high_confidence": "llm_call", # 高置信度 → 直接生成回答 - "retry_rag": "rag_retrieve", # 低置信度 → 再次检索 - "low_confidence": "web_search", # 两次RAG后仍低 → 联网搜索 - "no_rag": "web_search", # 无结果 → 联网搜索 - } - ) + # RAG 检索后回到 react_reason,由意图识别决定下一步 + graph.add_edge("rag_retrieve", "react_reason") # 循环边(回到 react_reason) loop_back_nodes = ["web_search", "handle_error"] + subgraph_names diff --git a/backend/app/main_graph/nodes/fast_paths.py b/backend/app/main_graph/nodes/fast_paths.py index 6dc7fec..0dce7a2 100644 --- a/backend/app/main_graph/nodes/fast_paths.py +++ b/backend/app/main_graph/nodes/fast_paths.py @@ -103,8 +103,9 @@ async def fast_rag_node(state: MainGraphState, config: Optional[RunnableConfig] # 注意:这里不设置 final_result,让 llm_call 节点处理 return state - # 无效结果:升级到 React 循环 + # 检索结果无效:标记失败,升级到 React 循环 info("[Fast RAG] 无有效检索结果,升级到 React 循环") + await dispatch_custom_event("fast_path_end", {"path": "fast_rag", "success": False}, config) return _mark_fast_path_failed(state, "无有效检索结果") except Exception as e: diff --git a/backend/app/main_graph/nodes/llm_call.py b/backend/app/main_graph/nodes/llm_call.py index 9ab9fd2..a6fb141 100644 --- a/backend/app/main_graph/nodes/llm_call.py +++ b/backend/app/main_graph/nodes/llm_call.py @@ -18,24 +18,20 @@ from backend.app.logger import debug, info, error def create_dynamic_llm_call_node(chat_services: Dict[str, BaseChatModel], tools: list): """ 工厂函数:创建动态 LLM 调用节点(根据 state.current_model 选择模型) - + Args: chat_services: 模型名称 -> ChatModel 实例 的字典 - tools: 工具列表 - + tools: 工具列表(llm_call 不使用工具,只负责回答) + Returns: 异步节点函数 """ - # 预构建所有模型的 tools 绑定(避免每次调用都 bind) - bound_models: Dict[str, Any] = {} - for name, llm in chat_services.items(): - if tools: - bound_models[name] = llm.bind_tools(tools) - else: - bound_models[name] = llm - - # 预构建 prompt - prompt = create_system_prompt(tools) + # llm_call 节点不使用工具,只负责生成回答 + # 直接使用原始模型,不绑定工具 + models = chat_services + + # 预构建 prompt(不带工具描述) + prompt = create_system_prompt() from langchain_core.runnables.config import RunnableConfig @@ -70,14 +66,14 @@ def create_dynamic_llm_call_node(chat_services: Dict[str, BaseChatModel], tools: # 动态选择模型 model_name = getattr(state, "current_model", "") - if not model_name or model_name not in bound_models: + if not model_name or model_name not in models: # 回退到第一个可用模型 - fallback_name = next(iter(bound_models.keys())) + fallback_name = next(iter(models.keys())) info(f"[llm_call] 模型 '{model_name}' 不可用,回退到 '{fallback_name}'") model_name = fallback_name - - llm_with_tools = bound_models[model_name] - info(f"[llm_call] 使用模型: {model_name}") + + llm = models[model_name] + info(f"[llm_call] 使用模型(无工具): {model_name}") try: # 添加上下文到消息 @@ -103,7 +99,7 @@ def create_dynamic_llm_call_node(chat_services: Dict[str, BaseChatModel], tools: # 恢复为:手动进行 astream,并将所有的 chunk 拼接成最终的 response 返回。 # LangGraph 会自动监听这期间产生的所有 token。 - chain = prompt | llm_with_tools + chain = prompt | llm chunks = [] info(f"[llm_call] 开始调用 LLM astream...") async for chunk in chain.astream( @@ -115,8 +111,13 @@ def create_dynamic_llm_call_node(chat_services: Dict[str, BaseChatModel], tools: ): chunks.append(chunk) - info(f"[llm_call] LLM astream 完成,共收到 {len(chunks)} 个 chunks,info:{chunks}") + info(f"[llm_call] LLM astream 完成,共收到 {len(chunks)} 个 chunks,info:{chunks[0].content[:50]}...{chunks[-1].content[:50]}") + # 将所有 chunk 合并成最终的 AIMessage + if chunks: + response = chunks[0].content + for chunk in chunks[1:]: + response = response + chunk.content # 将所有 chunk 合并成最终的 AIMessage if chunks: response = chunks[0] @@ -167,9 +168,6 @@ def create_dynamic_llm_call_node(chat_services: Dict[str, BaseChatModel], tools: debug(f"📋 [LLM统计] 详细用量: {token_usage}") debug("="*80 + "\n") - # 检查是否有工具调用 - has_tool_calls = hasattr(response, 'tool_calls') and len(response.tool_calls) > 0 - result = { "messages": [response], "llm_calls": getattr(state, 'llm_calls', 0) + 1, @@ -179,7 +177,6 @@ def create_dynamic_llm_call_node(chat_services: Dict[str, BaseChatModel], tools: "final_result": response.content, "success": True, "current_phase": "done", - "has_tool_calls": has_tool_calls, "current_model": model_name # 记录实际使用的模型 } diff --git a/backend/app/main_graph/nodes/rag_nodes.py b/backend/app/main_graph/nodes/rag_nodes.py index d036cac..f66a1ad 100644 --- a/backend/app/main_graph/nodes/rag_nodes.py +++ b/backend/app/main_graph/nodes/rag_nodes.py @@ -19,6 +19,23 @@ from ._utils import dispatch_custom_event, make_react_event # 置信度阈值配置 RAG_CONFIDENCE_THRESHOLD = 0.6 # 低于此值认为检索不相关 +# 全局 pipeline 实例 +_rag_pipeline = None + + +def _get_rag_pipeline(): + """获取 RAG Pipeline 实例""" + global _rag_pipeline + if _rag_pipeline is None: + from backend.app.rag.pipeline import RAGPipeline + _rag_pipeline = RAGPipeline( + num_queries=3, + rerank_top_n=5, + use_rerank=True, + return_parent_docs=True, + ) + return _rag_pipeline + def _get_rag_tool() -> Optional[callable]: """获取 RAG 工具""" @@ -27,7 +44,7 @@ def _get_rag_tool() -> Optional[callable]: # ========== RAG 检索核心逻辑 ========== -async def _rag_retrieve_core(state: MainGraphState, rag_tool: callable) -> MainGraphState: +async def _rag_retrieve_core(state: MainGraphState, pipeline) -> MainGraphState: """执行 RAG 检索的核心逻辑""" retrieval_query = state.user_query @@ -38,15 +55,20 @@ async def _rag_retrieve_core(state: MainGraphState, rag_tool: callable) -> MainG if cfg and cfg.retrieval_query: retrieval_query = cfg.retrieval_query - # 调用 RAG 工具 - rag_context = await rag_tool.ainvoke(retrieval_query) + # 直接调用 pipeline 获取文档和上下文 + documents = await pipeline.aretrieve(retrieval_query) + rag_context = pipeline.format_context(documents) + info(f"[RAG Core] 获取到 rag_context: {type(rag_context)}, 长度={len(rag_context) if rag_context else 0}") + info(f"[RAG Core] 获取到 rag_docs: {len(documents)} 个文档") # 更新状态 state.rag_context = rag_context - state.rag_retrieved = True + state.rag_docs = documents # 保存文档用于置信度评估 + state.rag_retrieved = bool(documents) # 有文档才算检索成功 state.rag_attempts = getattr(state, 'rag_attempts', 0) + 1 - state.debug_info["rag_source"] = "tool" + state.debug_info["rag_source"] = "pipeline" + state.debug_info["rag_scores"] = pipeline.last_scores # 保存分数信息 return state @@ -57,12 +79,7 @@ async def rag_retrieve_node(state: MainGraphState, config: Optional[RunnableConf state.current_phase = "rag_retrieving" start_time = time.time() - rag_tool = _get_rag_tool() - if not rag_tool: - info("[RAG] RAG 工具未初始化") - state.rag_confidence = 0.0 - state.rag_retrieved = False - return state + pipeline = _get_rag_pipeline() await dispatch_custom_event( "react_reasoning", @@ -71,7 +88,7 @@ async def rag_retrieve_node(state: MainGraphState, config: Optional[RunnableConf ) try: - state = await _rag_retrieve_core(state, rag_tool) + state = await _rag_retrieve_core(state, pipeline) # 评估置信度 confidence = await _evaluate_rag_confidence(state) @@ -111,7 +128,7 @@ async def _evaluate_rag_confidence(state: MainGraphState) -> float: return 0.0 # 方式1: 向量相似度(从 rag_docs 中获取) - embedding_score = _get_embedding_similarity(state, query) + embedding_score = _get_embedding_similarity(state) info(f"[RAG Confidence] 向量相似度={embedding_score:.3f}") # 方式2: 重排序分数(从 rag_docs 中获取) @@ -131,36 +148,43 @@ async def _evaluate_rag_confidence(state: MainGraphState) -> float: def _get_embedding_similarity(state: MainGraphState) -> float: - """从 rag_docs 中获取向量相似度分数""" - rag_docs = getattr(state, "rag_docs", []) + """从 rag_scores 或 rag_docs 中获取向量相似度分数""" + # 优先从 pipeline 提供的分数中获取 + rag_scores = state.debug_info.get("rag_scores", []) + if rag_scores: + scores = [s.get("embedding_score", 0.0) for s in rag_scores] + if scores: + # 归一化到 0-1 + normalized = [min(s / 10.0, 1.0) if s > 1.0 else s for s in scores] + return max(normalized) - # 如果有多个文档,取最高分 + # 降级:从 rag_docs 中获取 + rag_docs = getattr(state, "rag_docs", []) scores = [] for doc in rag_docs: if isinstance(doc, dict): score = doc.get("score", 0.0) - # 向量相似度通常在 0-1 之间,RRF 分数可能更高 - # 归一化到 0-1 - if score > 1.0: - score = min(score / 10.0, 1.0) # 假设 max 约 10 - scores.append(score) elif hasattr(doc, "metadata"): - score = doc.metadata.get("score", 0.0) - if score > 1.0: - score = min(score / 10.0, 1.0) - scores.append(score) + score = doc.metadata.get("embedding_score", doc.metadata.get("score", 0.0)) + else: + continue + if score > 1.0: + score = min(score / 10.0, 1.0) + scores.append(score) - if scores: - # 取平均或最高分 - return max(scores) # 使用最高分更准确 - return 0.0 + return max(scores) if scores else 0.0 def _get_rerank_score(state: MainGraphState) -> float: - """从 rag_docs 中获取重排序分数""" - rag_docs = getattr(state, "rag_docs", []) + """从 rag_scores 或 rag_docs 中获取重排序分数""" + # 优先从 pipeline 提供的分数中获取 + rag_scores = state.debug_info.get("rag_scores", []) + if rag_scores: + scores = [s.get("rerank_score", 0.0) for s in rag_scores] + return max(scores) if scores else 0.0 - # 重排分数通常在 0-1 之间 + # 降级:从 rag_docs 中获取 + rag_docs = getattr(state, "rag_docs", []) scores = [] for doc in rag_docs: if isinstance(doc, dict): @@ -168,14 +192,11 @@ def _get_rerank_score(state: MainGraphState) -> float: elif hasattr(doc, "metadata"): score = doc.metadata.get("rerank_score", 0.0) else: - score = 0.0 - + continue if score > 0: scores.append(score) - if scores: - return max(scores) # 使用最高分 - return 0.0 + return max(scores) if scores else 0.0 async def _get_llm_score(state: MainGraphState) -> float: diff --git a/backend/app/main_graph/nodes/reasoning.py b/backend/app/main_graph/nodes/reasoning.py index b8c81e1..616604f 100644 --- a/backend/app/main_graph/nodes/reasoning.py +++ b/backend/app/main_graph/nodes/reasoning.py @@ -23,6 +23,8 @@ async def react_reason_node(state: MainGraphState, config: Optional[RunnableConf # 步骤1: 准备上下文 context = { "retrieved_docs": state.rag_docs, + "rag_confidence": getattr(state, "rag_confidence", 0.0), + "rag_attempts": getattr(state, "rag_attempts", 0), "previous_actions": [h.get("action") for h in state.reasoning_history], "reasoning_history": state.reasoning_history, "messages": state.messages, diff --git a/backend/app/main_graph/nodes/routing.py b/backend/app/main_graph/nodes/routing.py index 36d18b4..dff32f5 100644 --- a/backend/app/main_graph/nodes/routing.py +++ b/backend/app/main_graph/nodes/routing.py @@ -112,8 +112,8 @@ def route_by_reasoning(state: MainGraphState) -> str: info(f"[条件路由] 检测到路由循环: {previous_actions[-4:]},强制终止") return "llm_call" - # 2. 状态停滞检测(连续相同动作) - if len(previous_actions) >= 2 and previous_actions[-1] == previous_actions[-2]: + # 2. 状态停滞检测(连续相同动作 TODO:本来应该是2) + if len(previous_actions) >= 3 and previous_actions[-1] == previous_actions[-2] and previous_actions[-2] == previous_actions[-3]: info(f"[条件路由] 连续相同动作 '{previous_actions[-1]}',强制终止") return "llm_call" diff --git a/backend/app/rag/pipeline.py b/backend/app/rag/pipeline.py index 0d15fea..a161cc4 100644 --- a/backend/app/rag/pipeline.py +++ b/backend/app/rag/pipeline.py @@ -36,6 +36,8 @@ class RAGPipeline: self.rerank_top_n = rerank_top_n self.use_rerank = use_rerank self.return_parent_docs = return_parent_docs + self._last_docs = [] # 保存最后一次检索的文档 + self._last_scores = [] # 保存最后一次检索的分数 if llm == "default_small": try: @@ -49,6 +51,16 @@ class RAGPipeline: self.reranker = create_document_reranker() if use_rerank else None logger.info(f"[Pipeline] init: rerank={use_rerank}, return_parent={return_parent_docs}") + @property + def last_docs(self) -> List[Document]: + """获取最后一次检索的文档""" + return self._last_docs + + @property + def last_scores(self) -> List[dict]: + """获取最后一次检索的分数信息""" + return self._last_scores + async def aretrieve(self, query: str) -> List[Document]: # Step 1: 检索 child_docs = await self._retrieve(query) @@ -69,9 +81,24 @@ class RAGPipeline: # Step 3: 获取父文档 if self.return_parent_docs: - return await self._get_parents(child_docs) + parent_docs = await self._get_parents(child_docs) + # 保存分数信息到 last_scores 供外部访问 + self._last_scores = self._extract_scores(parent_docs) + return parent_docs + + self._last_scores = self._extract_scores(child_docs) return child_docs + def _extract_scores(self, docs: List[Document]) -> List[dict]: + """提取文档的分数信息""" + scores = [] + for doc in docs: + scores.append({ + "embedding_score": doc.metadata.get("embedding_score", doc.metadata.get("score", 0.0)), + "rerank_score": doc.metadata.get("rerank_score", 0.0), + }) + return scores + async def _retrieve(self, query: str) -> List[Document]: if self.query_generator: queries = await self.query_generator.agenerate(query) @@ -100,7 +127,7 @@ class RAGPipeline: try: from backend.rag_core import create_docstore docstore, _ = create_docstore() - parent_docs = docstore.mget(list(parent_map.keys())) + parent_docs =await docstore.amget(list(parent_map.keys())) # 构建结果,保持分数信息 result = [] diff --git a/tools/start.py b/tools/start.py index db56a5a..b17e836 100755 --- a/tools/start.py +++ b/tools/start.py @@ -13,7 +13,7 @@ from dotenv import load_dotenv # 路径设置 project_root = Path(__file__).resolve().parent.parent -sys.path.insert(0, str(project_root)) +#sys.path.insert(0, str(project_root)) load_dotenv(project_root / ".env") # 全局变量 diff --git a/tools/test/test_graph_branches.py b/tools/test/test_graph_branches.py index 687fd45..a7a4d65 100644 --- a/tools/test/test_graph_branches.py +++ b/tools/test/test_graph_branches.py @@ -26,18 +26,18 @@ TEST_CASES = [ "query": "吕布的事迹?", "description": "测试快速 RAG 分支" }, - # 测试3: 需要推理的复杂问题 - 应该直接到 React 循环 - { - "name": "复杂推理测试", - "query": "请帮我分析:如果我有10万元,想要在一年内获得15%的收益,有哪些低风险的投资方案?", - "description": "测试 React 循环推理分支" - }, - # 测试4: 需要工具调用的问题 - { - "name": "联网工具调用测试", - "query": "搜索一下今天的天气怎么样", - "description": "测试工具调用分支" - }, + # # 测试3: 需要推理的复杂问题 - 应该直接到 React 循环 + # { + # "name": "复杂推理测试", + # "query": "请帮我分析:如果我有10万元,想要在一年内获得15%的收益,有哪些低风险的投资方案?", + # "description": "测试 React 循环推理分支" + # }, + # # 测试4: 需要工具调用的问题 + # { + # "name": "联网工具调用测试", + # "query": "搜索一下今天的天气怎么样", + # "description": "测试工具调用分支" + # }, # 测试5: 带记忆的对话 { "name": "记忆测试",