Files
ailine/frontend/src/components/chat_area.py
root d8da45bc97
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Has been cancelled
fix: 修复前后端启动问题,添加 BACKEND_PORT 配置
2026-04-22 01:34:34 +08:00

347 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
中间聊天区组件
包含模型选择、消息显示和输入框
"""
import re
import streamlit as st
from state import AppState
from api_client import api_client
from config import config
def render_chat_area():
"""渲染中间聊天区域"""
# 顶部:极简模型选择器(可选放在顶部中间)
_render_model_selector()
# 使用空白占位符或者不需要 divider 让界面更干净
st.write("")
# 渲染历史消息
_render_chat_history()
# 输入框和流式响应处理
_render_input_and_response()
def _render_model_selector():
"""渲染模型选择器,极简风格"""
col_empty1, col_model, col_empty2 = st.columns([1, 2, 1])
with col_model:
selected_model = st.selectbox(
"选择模型",
options=list(config.model_options.keys()),
format_func=lambda x: config.model_options[x],
index=_get_model_index(),
label_visibility="collapsed" # 隐藏标签,只显示下拉框,更加现代
)
AppState.set_selected_model(selected_model)
def _get_model_index() -> int:
"""
获取当前选中模型的索引
Returns:
模型索引
"""
current_model = AppState.get_selected_model()
model_keys = list(config.model_options.keys())
return model_keys.index(current_model) if current_model in model_keys else 0
def _render_chat_history():
"""渲染历史聊天消息"""
messages = AppState.get_messages()
for msg in messages:
with st.chat_message(msg["role"]):
content = msg["content"]
# 1. 尝试解析我们在前端流式结束后存入的 ```thought 格式
if "```thought\n" in content:
parts = content.split("```thought\n")
if parts[0].strip():
st.markdown(parts[0])
for part in parts[1:]:
if "\n```\n" in part:
thought, rest = part.split("\n```\n", 1)
with st.expander("🤔 思考过程", expanded=False):
st.markdown(thought)
if rest.strip():
st.markdown(rest)
else:
st.markdown("```thought\n" + part)
# 2. 尝试解析从后端原始加载的历史记录中包含的 <think> 标签
elif "<think>" in content and "</think>" in content:
# 提取思考内容和剩余正文
thought_match = re.search(r'<think>(.*?)</think>', content, re.DOTALL)
if thought_match:
thought = thought_match.group(1).strip()
rest = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL).strip()
with st.expander("🤔 思考过程", expanded=False):
st.markdown(thought)
if rest:
st.markdown(rest)
else:
st.markdown(content)
else:
st.markdown(content)
def _render_input_and_response():
"""渲染输入框并处理用户输入与AI响应"""
if prompt := st.chat_input("请输入您的问题...", key="chat_input"):
# 显示用户消息
with st.chat_message("user"):
st.markdown(prompt)
AppState.add_message("user", prompt)
# 流式调用 AI 回复
_handle_ai_response()
def _handle_ai_response():
"""处理 AI 流式响应 (适配 LangGraph v2 事件格式)"""
with st.chat_message("assistant"):
# 用于容纳思考过程的占位符(只有在使用 DeepSeek reasoner 时才显示)
thought_placeholder = st.empty()
message_placeholder = st.empty()
tool_status_placeholder = st.empty()
raw_text = ""
api_thought = ""
display_text = ""
display_thought = ""
rag_sources = None # 存储 RAG 检索来源信息
# 调用流式 API
stream = api_client.chat_stream(
message=AppState.get_messages()[-1]["content"],
thread_id=AppState.get_current_thread_id(),
model=AppState.get_selected_model(),
user_id=AppState.get_user_id()
)
# 消费流式响应 (v2 格式)
for event in stream:
event_type = event.get("type")
# [DEBUG] 可以在前端终端看到接收到的事件
import logging
if event_type == "llm_token":
logging.debug(f"[Frontend Stream] token: {repr(event.get('token'))}, reasoning: {repr(event.get('reasoning_token'))}")
# 1. 处理 LLM Token 流 (打字机效果)
if event_type == "llm_token":
# 确保只处理来自 LLM 的 token避免将工具的输出作为 token 显示
if event.get("node") == "llm_call":
token = str(event.get("token", ""))
reasoning_token = str(event.get("reasoning_token", ""))
if reasoning_token:
api_thought += reasoning_token
if token:
raw_text += token
display_thought = api_thought
display_text = raw_text
is_thinking = False
# 1. 原生 API 推理模式 (如 DeepSeek-Reasoner)
if api_thought:
is_thinking = not bool(raw_text.strip())
# 2. 本地模型 <think> 标签模式 (如 Gemma, 本地 DeepSeek)
if "<think>" in raw_text:
think_match = re.search(r'<think>(.*?)(</think>|$)', raw_text, re.DOTALL)
if think_match:
display_thought = think_match.group(1).strip()
is_thinking = "</think>" not in raw_text
# 正文部分应该是除去了整个 <think>...</think> 块后的剩余内容
# 注意:流式输出时可能 </think> 还没出来,此时也要把 <think> 到末尾的部分剔除,只显示正文
if is_thinking:
display_text = re.sub(r'<think>.*$', '', raw_text, flags=re.DOTALL).strip()
else:
display_text = re.sub(r'<think>.*?</think>', '', raw_text, flags=re.DOTALL).strip()
elif "<" in raw_text and "think" in raw_text and not raw_text.startswith("<think>"):
# 处理一种特殊情况:模型正在输出 <think> 标签的过程中(例如刚输出了 "<thin"
# 此时正则表达式匹配不到完整的 "<think>",会导致残缺的标签显示在正文中
# 我们做个简单拦截:如果在开头发现了不完整的标签,暂时不显示它
if re.match(r'^<t[hink>]*$', raw_text):
display_text = ""
is_thinking = True
# 渲染思考过程
if display_thought:
# 使用 st.empty 的特殊方式来避免闪烁和嵌套
# Streamlit 无法在流式中动态切换 expander 的 expanded 状态
# 最好的方法是直接写一个 markdown 组件,使用 info 的样式来模拟
if is_thinking:
# 正在思考时,直接显示内容,不要用 expander
thought_placeholder.info(f"**🤔 思考过程 (正在思考...)**\n\n{display_thought}")
else:
# 思考完毕后,将 placeholder 替换为空,等待最终替换为折叠面板
thought_placeholder.info(f"**🤔 思考过程**\n\n{display_thought}")
# 渲染正式回复
if display_text or not is_thinking:
cursor = "" if not is_thinking else ""
message_placeholder.markdown(display_text + cursor)
# 2. 处理状态更新 (节点完成、工具结果等)
elif event_type == "state_update":
# state_update 可能包含多种数据,常见的是 messages 更新
data = event.get("data", {})
messages_update = event.get("messages", [])
if not messages_update and isinstance(data, dict):
for node_name, node_data in data.items():
if isinstance(node_data, dict) and "messages" in node_data:
messages_update = node_data["messages"]
# 如果更新中包含 messages说明某个节点输出了完整消息
# 但我们已经在用 token 流构建回复,这里可以用来检测工具调用结果
if messages_update:
# 检查最后一条消息是否来自工具
last_msg = messages_update[-1] if messages_update else {}
if isinstance(last_msg, dict) and last_msg.get("role") == "tool":
tool_name = last_msg.get("name", "unknown")
tool_content = last_msg.get("content", "")
# 存储 RAG 检索结果
if tool_name == "search_knowledge_base":
# 尝试解析 tool_content它可能是 JSON 字符串
sources = []
try:
if isinstance(tool_content, str):
import json
data = json.loads(tool_content)
else:
data = tool_content
# 提取来源列表
if isinstance(data, dict) and "sources" in data:
sources = data["sources"]
else:
sources = [str(data)]
except Exception:
sources = [str(tool_content)]
rag_sources = sources
tool_status_placeholder.success(f"✅ 工具 {tool_name} 执行完成")
# 短暂显示后清除,保持界面清爽
import time
time.sleep(0.5)
tool_status_placeholder.empty()
# 3. 处理自定义事件 (你在后端通过 get_stream_writer 发送的)
elif event_type == "custom":
custom_data = event.get("data", {})
# 检查是否是完成事件
if custom_data.get("type") == "done":
_show_completion_stats(custom_data)
# 其他自定义事件,比如工具调用状态
elif "type" in custom_data:
custom_type = custom_data["type"]
if custom_type == "tool_start":
tool_name = custom_data.get("tool", "unknown")
tool_status_placeholder.info(f"🔧 调用工具: {tool_name}...")
elif custom_type == "tool_end":
tool_name = custom_data.get("tool", "unknown")
tool_status_placeholder.success(f"✅ 工具 {tool_name} 完成")
tool_status_placeholder.empty()
elif "status" in custom_data:
status_msg = custom_data.get("status", "")
tool_status_placeholder.info(f"🔧 {status_msg}")
# 4. 处理错误
elif event_type == "error":
st.error(f"❌ 错误: {event.get('message', '未知错误')}")
break # 发生错误时停止处理
# 注意v2 格式中没有固定的 "done" 事件,流结束即代表完成
# 统计信息 (token_usage, elapsed_time) 通常会在最后的 state_update 中携带
# 如果后端在最终状态里返回了这些信息,可以在此处理
# 流结束后,移除光标并保存完整回复
display_text = raw_text
display_thought = api_thought
# 最后的标签清理,以防未闭合
if "<think>" in raw_text:
think_match = re.search(r'<think>(.*?)(</think>|$)', raw_text, re.DOTALL)
if think_match:
display_thought = think_match.group(1).strip()
display_text = re.sub(r'<think>.*?(</think>|$)', '', raw_text, flags=re.DOTALL).strip()
if display_thought:
# 只有在最终结束时,才把它放进折叠面板
with thought_placeholder.container():
with st.expander("🤔 思考过程", expanded=False):
st.markdown(display_thought)
else:
thought_placeholder.empty()
# 移除光标
message_placeholder.markdown(display_text)
# 显示 RAG 检索来源(如果有)
if rag_sources:
with st.expander("🔍 检索来源", expanded=False):
# 格式化来源列表
if isinstance(rag_sources, list):
for i, source in enumerate(rag_sources, 1):
if isinstance(source, dict):
content = source.get("page_content", source.get("content", str(source)))
metadata = source.get("metadata", {})
filename = metadata.get("filename", metadata.get("source", "未知文件"))
page = metadata.get("page", metadata.get("page_number", ""))
if page:
source_info = f"**来源 {i}:** {filename} (第{page}页)"
else:
source_info = f"**来源 {i}:** {filename}"
st.markdown(source_info)
# 显示内容预览前200字符
preview = content[:200] + "..." if len(content) > 200 else content
st.markdown(f"> {preview}")
st.markdown("---")
else:
st.markdown(f"**来源 {i}:** {str(source)}")
else:
st.markdown(str(rag_sources))
# 拼装包含思考过程的完整内容,以便后续在历史中正确渲染
final_content = display_text
if display_thought:
final_content = f"```thought\n{display_thought}\n```\n\n" + display_text
AppState.add_message("assistant", final_content)
tool_status_placeholder.empty()
# 消息发送完毕后,静默刷新历史记录列表
# (因为可能生成了新对话,或者旧对话摘要已更新)
from .sidebar import _refresh_threads
_refresh_threads()
# 强制重绘页面,使侧边栏立即显示最新记录
st.rerun()
def _show_completion_stats(event: dict):
"""
显示对话完成统计信息
Args:
event: 完成事件数据
"""
token_usage = event.get("token_usage", {})
elapsed = event.get("elapsed_time", 0)
if token_usage:
total_tokens = token_usage.get("total_tokens", 0)
st.caption(f"📊 消耗 {total_tokens} tokens | ⏱️ {elapsed:.2f}s")