重构代码,实现相对导入
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 5m26s

This commit is contained in:
2026-04-21 10:26:37 +08:00
parent 37e021e302
commit 726236eaff
68 changed files with 119 additions and 3990 deletions

9
frontend/src/__init__.py Normal file
View File

@@ -0,0 +1,9 @@
"""
AI Agent 前端模块
采用分层架构设计包含配置、状态、API客户端和UI组件
"""
from .logger import debug, info, warning, error
__version__ = "2.0.0"
__all__ = ["debug", "info", "warning", "error"]

191
frontend/src/api_client.py Normal file
View File

@@ -0,0 +1,191 @@
"""
API 客户端模块
封装所有与后端的通信,支持流式响应
"""
import json
from typing import List, Dict, Any, Generator
import requests
# 使用相对导入
from .config import config
from .logger import error, warning
class APIClient:
"""后端 API 客户端 - 统一封装所有 HTTP 请求"""
def __init__(self, base_url: str = None):
"""
初始化 API 客户端
Args:
base_url: 后端 API 地址(默认从配置读取)
"""
self.base_url = (base_url or config.api_base).rstrip("/")
# ==================== 历史管理接口 ====================
def get_user_threads(self, user_id: str, limit: int = None) -> List[Dict[str, Any]]:
"""
获取用户的历史对话列表
Args:
user_id: 用户 ID
limit: 返回数量限制(默认使用配置值)
Returns:
线程列表,每个元素包含 thread_id, summary, message_count, last_updated
"""
try:
resp = requests.get(
f"{self.base_url}/threads",
params={
"user_id": user_id,
"limit": limit or config.history_limit
},
timeout=10
)
if resp.status_code == 200:
return resp.json().get("threads", [])
else:
warning(f"获取历史列表失败: HTTP {resp.status_code}")
return []
except Exception as e:
error(f"获取历史列表异常: {e}")
return []
def get_thread_messages(self, thread_id: str, user_id: str) -> List[Dict[str, str]]:
"""
获取指定线程的完整消息历史
Args:
thread_id: 线程 ID
user_id: 用户 ID
Returns:
消息列表,每个元素包含 role 和 content
"""
try:
resp = requests.get(
f"{self.base_url}/thread/{thread_id}/messages",
params={"user_id": user_id},
timeout=10
)
if resp.status_code == 200:
return resp.json().get("messages", [])
else:
warning(f"获取消息历史失败: HTTP {resp.status_code}")
return []
except Exception as e:
error(f"获取消息历史异常: {e}")
return []
def get_thread_summary(self, thread_id: str, user_id: str) -> Dict[str, Any]:
"""
获取指定线程的摘要信息
Args:
thread_id: 线程 ID
user_id: 用户 ID
Returns:
摘要信息字典
"""
try:
resp = requests.get(
f"{self.base_url}/thread/{thread_id}/summary",
params={"user_id": user_id},
timeout=10
)
if resp.status_code == 200:
return resp.json()
else:
warning(f"获取线程摘要失败: HTTP {resp.status_code}")
return {"summary": "加载失败", "message_count": 0}
except Exception as e:
error(f"获取线程摘要异常: {e}")
return {"summary": "加载失败", "message_count": 0}
# ==================== 聊天接口 ====================
def chat_stream(
self,
message: str,
thread_id: str,
model: str,
user_id: str
) -> Generator[Dict[str, Any], None, None]:
"""
流式对话接口SSE
Args:
message: 用户消息
thread_id: 线程 ID
model: 模型名称
user_id: 用户 ID
Yields:
SSE 事件字典,类型包括:
- token: 逐字输出 {type: "token", content: "..."}
- tool_start: 工具调用开始 {type: "tool_start", tool: "..."}
- tool_end: 工具调用完成 {type: "tool_end", tool: "..."}
- done: 对话完成 {type: "done", token_usage: {...}, elapsed_time: ...}
- error: 错误信息 {type: "error", message: "..."}
"""
payload = {
"message": message,
"thread_id": thread_id,
"model": model,
"user_id": user_id
}
try:
with requests.post(
f"{self.base_url}/chat/stream",
json=payload,
stream=True,
timeout=config.stream_timeout
) as response:
if response.status_code != 200:
yield {
"type": "error",
"message": f"请求失败: HTTP {response.status_code}"
}
return
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
yield data
except json.JSONDecodeError as e:
warning(f"JSON 解析失败: {e}")
except requests.exceptions.Timeout:
yield {
"type": "error",
"message": "请求超时,请检查网络连接"
}
except Exception as e:
error(f"流式对话异常: {e}")
yield {
"type": "error",
"message": f"请求失败: {str(e)}"
}
# 全局 API 客户端实例(单例模式)
api_client = APIClient()

View File

@@ -0,0 +1,4 @@
"""
UI 组件模块
包含所有可复用的 Streamlit 组件
"""

View File

@@ -0,0 +1,347 @@
"""
中间聊天区组件
包含模型选择、消息显示和输入框
"""
import re
import streamlit as st
# 使用相对导入
from ..state import AppState
from ..api_client import api_client
from ..config import config
def render_chat_area():
"""渲染中间聊天区域"""
# 顶部:极简模型选择器(可选放在顶部中间)
_render_model_selector()
# 使用空白占位符或者不需要 divider 让界面更干净
st.write("")
# 渲染历史消息
_render_chat_history()
# 输入框和流式响应处理
_render_input_and_response()
def _render_model_selector():
"""渲染模型选择器,极简风格"""
col_empty1, col_model, col_empty2 = st.columns([1, 2, 1])
with col_model:
selected_model = st.selectbox(
"选择模型",
options=list(config.model_options.keys()),
format_func=lambda x: config.model_options[x],
index=_get_model_index(),
label_visibility="collapsed" # 隐藏标签,只显示下拉框,更加现代
)
AppState.set_selected_model(selected_model)
def _get_model_index() -> int:
"""
获取当前选中模型的索引
Returns:
模型索引
"""
current_model = AppState.get_selected_model()
model_keys = list(config.model_options.keys())
return model_keys.index(current_model) if current_model in model_keys else 0
def _render_chat_history():
"""渲染历史聊天消息"""
messages = AppState.get_messages()
for msg in messages:
with st.chat_message(msg["role"]):
content = msg["content"]
# 1. 尝试解析我们在前端流式结束后存入的 ```thought 格式
if "```thought\n" in content:
parts = content.split("```thought\n")
if parts[0].strip():
st.markdown(parts[0])
for part in parts[1:]:
if "\n```\n" in part:
thought, rest = part.split("\n```\n", 1)
with st.expander("🤔 思考过程", expanded=False):
st.markdown(thought)
if rest.strip():
st.markdown(rest)
else:
st.markdown("```thought\n" + part)
# 2. 尝试解析从后端原始加载的历史记录中包含的 <think> 标签
elif "<think>" in content and "</think>" in content:
# 提取思考内容和剩余正文
thought_match = re.search(r'<think>(.*?)</think>', content, re.DOTALL)
if thought_match:
thought = thought_match.group(1).strip()
rest = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL).strip()
with st.expander("🤔 思考过程", expanded=False):
st.markdown(thought)
if rest:
st.markdown(rest)
else:
st.markdown(content)
else:
st.markdown(content)
def _render_input_and_response():
"""渲染输入框并处理用户输入与AI响应"""
if prompt := st.chat_input("请输入您的问题...", key="chat_input"):
# 显示用户消息
with st.chat_message("user"):
st.markdown(prompt)
AppState.add_message("user", prompt)
# 流式调用 AI 回复
_handle_ai_response()
def _handle_ai_response():
"""处理 AI 流式响应 (适配 LangGraph v2 事件格式)"""
with st.chat_message("assistant"):
# 用于容纳思考过程的占位符(只有在使用 DeepSeek reasoner 时才显示)
thought_placeholder = st.empty()
message_placeholder = st.empty()
tool_status_placeholder = st.empty()
raw_text = ""
api_thought = ""
display_text = ""
display_thought = ""
rag_sources = None # 存储 RAG 检索来源信息
# 调用流式 API
stream = api_client.chat_stream(
message=AppState.get_messages()[-1]["content"],
thread_id=AppState.get_current_thread_id(),
model=AppState.get_selected_model(),
user_id=AppState.get_user_id()
)
# 消费流式响应 (v2 格式)
for event in stream:
event_type = event.get("type")
# [DEBUG] 可以在前端终端看到接收到的事件
import logging
if event_type == "llm_token":
logging.debug(f"[Frontend Stream] token: {repr(event.get('token'))}, reasoning: {repr(event.get('reasoning_token'))}")
# 1. 处理 LLM Token 流 (打字机效果)
if event_type == "llm_token":
# 确保只处理来自 LLM 的 token避免将工具的输出作为 token 显示
if event.get("node") == "llm_call":
token = str(event.get("token", ""))
reasoning_token = str(event.get("reasoning_token", ""))
if reasoning_token:
api_thought += reasoning_token
if token:
raw_text += token
display_thought = api_thought
display_text = raw_text
is_thinking = False
# 1. 原生 API 推理模式 (如 DeepSeek-Reasoner)
if api_thought:
is_thinking = not bool(raw_text.strip())
# 2. 本地模型 <think> 标签模式 (如 Gemma, 本地 DeepSeek)
if "<think>" in raw_text:
think_match = re.search(r'<think>(.*?)(</think>|$)', raw_text, re.DOTALL)
if think_match:
display_thought = think_match.group(1).strip()
is_thinking = "</think>" not in raw_text
# 正文部分应该是除去了整个 <think>...</think> 块后的剩余内容
# 注意:流式输出时可能 </think> 还没出来,此时也要把 <think> 到末尾的部分剔除,只显示正文
if is_thinking:
display_text = re.sub(r'<think>.*$', '', raw_text, flags=re.DOTALL).strip()
else:
display_text = re.sub(r'<think>.*?</think>', '', raw_text, flags=re.DOTALL).strip()
elif "<" in raw_text and "think" in raw_text and not raw_text.startswith("<think>"):
# 处理一种特殊情况:模型正在输出 <think> 标签的过程中(例如刚输出了 "<thin"
# 此时正则表达式匹配不到完整的 "<think>",会导致残缺的标签显示在正文中
# 我们做个简单拦截:如果在开头发现了不完整的标签,暂时不显示它
if re.match(r'^<t[hink>]*$', raw_text):
display_text = ""
is_thinking = True
# 渲染思考过程
if display_thought:
# 使用 st.empty 的特殊方式来避免闪烁和嵌套
# Streamlit 无法在流式中动态切换 expander 的 expanded 状态
# 最好的方法是直接写一个 markdown 组件,使用 info 的样式来模拟
if is_thinking:
# 正在思考时,直接显示内容,不要用 expander
thought_placeholder.info(f"**🤔 思考过程 (正在思考...)**\n\n{display_thought}")
else:
# 思考完毕后,将 placeholder 替换为空,等待最终替换为折叠面板
thought_placeholder.info(f"**🤔 思考过程**\n\n{display_thought}")
# 渲染正式回复
if display_text or not is_thinking:
cursor = "" if not is_thinking else ""
message_placeholder.markdown(display_text + cursor)
# 2. 处理状态更新 (节点完成、工具结果等)
elif event_type == "state_update":
# state_update 可能包含多种数据,常见的是 messages 更新
data = event.get("data", {})
messages_update = event.get("messages", [])
if not messages_update and isinstance(data, dict):
for node_name, node_data in data.items():
if isinstance(node_data, dict) and "messages" in node_data:
messages_update = node_data["messages"]
# 如果更新中包含 messages说明某个节点输出了完整消息
# 但我们已经在用 token 流构建回复,这里可以用来检测工具调用结果
if messages_update:
# 检查最后一条消息是否来自工具
last_msg = messages_update[-1] if messages_update else {}
if isinstance(last_msg, dict) and last_msg.get("role") == "tool":
tool_name = last_msg.get("name", "unknown")
tool_content = last_msg.get("content", "")
# 存储 RAG 检索结果
if tool_name == "search_knowledge_base":
# 尝试解析 tool_content它可能是 JSON 字符串
sources = []
try:
if isinstance(tool_content, str):
import json
data = json.loads(tool_content)
else:
data = tool_content
# 提取来源列表
if isinstance(data, dict) and "sources" in data:
sources = data["sources"]
else:
sources = [str(data)]
except Exception:
sources = [str(tool_content)]
rag_sources = sources
tool_status_placeholder.success(f"✅ 工具 {tool_name} 执行完成")
# 短暂显示后清除,保持界面清爽
import time
time.sleep(0.5)
tool_status_placeholder.empty()
# 3. 处理自定义事件 (你在后端通过 get_stream_writer 发送的)
elif event_type == "custom":
custom_data = event.get("data", {})
# 检查是否是完成事件
if custom_data.get("type") == "done":
_show_completion_stats(custom_data)
# 其他自定义事件,比如工具调用状态
elif "type" in custom_data:
custom_type = custom_data["type"]
if custom_type == "tool_start":
tool_name = custom_data.get("tool", "unknown")
tool_status_placeholder.info(f"🔧 调用工具: {tool_name}...")
elif custom_type == "tool_end":
tool_name = custom_data.get("tool", "unknown")
tool_status_placeholder.success(f"✅ 工具 {tool_name} 完成")
tool_status_placeholder.empty()
elif "status" in custom_data:
status_msg = custom_data.get("status", "")
tool_status_placeholder.info(f"🔧 {status_msg}")
# 4. 处理错误
elif event_type == "error":
st.error(f"❌ 错误: {event.get('message', '未知错误')}")
break # 发生错误时停止处理
# 注意v2 格式中没有固定的 "done" 事件,流结束即代表完成
# 统计信息 (token_usage, elapsed_time) 通常会在最后的 state_update 中携带
# 如果后端在最终状态里返回了这些信息,可以在此处理
# 流结束后,移除光标并保存完整回复
display_text = raw_text
display_thought = api_thought
# 最后的标签清理,以防未闭合
if "<think>" in raw_text:
think_match = re.search(r'<think>(.*?)(</think>|$)', raw_text, re.DOTALL)
if think_match:
display_thought = think_match.group(1).strip()
display_text = re.sub(r'<think>.*?(</think>|$)', '', raw_text, flags=re.DOTALL).strip()
if display_thought:
# 只有在最终结束时,才把它放进折叠面板
with thought_placeholder.container():
with st.expander("🤔 思考过程", expanded=False):
st.markdown(display_thought)
else:
thought_placeholder.empty()
# 移除光标
message_placeholder.markdown(display_text)
# 显示 RAG 检索来源(如果有)
if rag_sources:
with st.expander("🔍 检索来源", expanded=False):
# 格式化来源列表
if isinstance(rag_sources, list):
for i, source in enumerate(rag_sources, 1):
if isinstance(source, dict):
content = source.get("page_content", source.get("content", str(source)))
metadata = source.get("metadata", {})
filename = metadata.get("filename", metadata.get("source", "未知文件"))
page = metadata.get("page", metadata.get("page_number", ""))
if page:
source_info = f"**来源 {i}:** {filename} (第{page}页)"
else:
source_info = f"**来源 {i}:** {filename}"
st.markdown(source_info)
# 显示内容预览前200字符
preview = content[:200] + "..." if len(content) > 200 else content
st.markdown(f"> {preview}")
st.markdown("---")
else:
st.markdown(f"**来源 {i}:** {str(source)}")
else:
st.markdown(str(rag_sources))
# 拼装包含思考过程的完整内容,以便后续在历史中正确渲染
final_content = display_text
if display_thought:
final_content = f"```thought\n{display_thought}\n```\n\n" + display_text
AppState.add_message("assistant", final_content)
tool_status_placeholder.empty()
# 消息发送完毕后,静默刷新历史记录列表
# (因为可能生成了新对话,或者旧对话摘要已更新)
from .sidebar import _refresh_threads
_refresh_threads()
# 强制重绘页面,使侧边栏立即显示最新记录
st.rerun()
def _show_completion_stats(event: dict):
"""
显示对话完成统计信息
Args:
event: 完成事件数据
"""
token_usage = event.get("token_usage", {})
elapsed = event.get("elapsed_time", 0)
if token_usage:
total_tokens = token_usage.get("total_tokens", 0)
st.caption(f"📊 消耗 {total_tokens} tokens | ⏱️ {elapsed:.2f}s")

View File

@@ -0,0 +1,39 @@
"""
右侧信息面板组件
显示会话信息和统计数据
"""
import streamlit as st
# 使用相对导入
from ..state import AppState
def render_info_panel():
"""渲染右侧信息面板(现改为侧边栏底部)"""
st.caption("📊 会话信息")
# 消息统计
_render_message_stats()
# 使用提示
_render_tips()
def _render_message_stats():
"""渲染消息统计"""
stats = AppState.get_message_stats()
st.markdown(f"<span style='font-size:0.8em;color:#666;'>共 {stats['user']} 问 / {stats['assistant']} 答</span>", unsafe_allow_html=True)
def _render_tips():
"""渲染使用提示"""
with st.expander("💡 使用提示", expanded=False):
st.markdown("""
<div style='font-size:0.85em;color:#555;'>
- 左侧可切换历史对话<br>
- 点击"新对话"开始新话题<br>
- 登录后对话历史隔离<br>
- 模型可随时切换
</div>
""", unsafe_allow_html=True)

View File

@@ -0,0 +1,164 @@
"""
左侧栏组件
包含用户登录和历史对话列表
"""
import streamlit as st
from datetime import datetime
# 使用相对导入
from ..state import AppState
from ..api_client import api_client
def render_sidebar():
"""渲染左侧栏"""
# 顶部放置新对话按钮,像 ChatGPT/DeepSeek 一样显眼
_render_history_actions()
st.divider()
# 历史列表
_render_history_section()
# 底部放用户部分
st.divider()
_render_user_section()
def _render_user_section():
"""渲染用户登录区域"""
# st.header("👤 用户") # 移除显眼的标题,改用更柔和的 caption
st.caption("👤 用户管理")
if not AppState.is_logged_in():
_render_login_form()
else:
_render_user_info()
def _render_login_form():
"""渲染登录表单"""
username = st.text_input(
"用户名",
key="login_input",
placeholder="输入用户名...",
help="未登录将使用 default_user可能导致对话污染",
label_visibility="collapsed"
)
if st.button("进入", type="secondary", use_container_width=True):
AppState.login(username)
_refresh_threads()
st.rerun()
# st.info("💡 建议登录以隔离对话历史") # 移除多余色块
def _render_user_info():
"""渲染用户信息"""
st.markdown(f"**当前用户**: `{AppState.get_user_id()}`")
if st.button("切换用户", type="secondary", use_container_width=True):
AppState.logout()
_refresh_threads()
st.rerun()
def _render_history_section():
"""渲染历史对话列表"""
col1, col2 = st.columns([3, 1])
with col1:
st.caption("📚 对话历史")
with col2:
if st.button("🔄", help="刷新列表", key="refresh_history_btn"):
_refresh_threads()
_render_thread_list()
def _render_history_actions():
"""渲染历史操作按钮"""
# 移除了 type="primary",让它变成普通的线框按钮,不再是大红块
if st.button(" 新对话", use_container_width=True):
AppState.start_new_thread()
st.rerun()
def _render_thread_list():
"""渲染线程列表"""
# 仅在初次加载时拉取,或由外部主动调用 _refresh_threads() 更新
if "threads_loaded" not in st.session_state:
_refresh_threads()
st.session_state.threads_loaded = True
threads = AppState.get_threads()
if not threads:
st.caption("暂无对话历史")
return
for thread in threads:
_render_thread_item(thread)
def _render_thread_item(thread: dict):
"""
渲染单个线程项
Args:
thread: 线程信息字典
"""
thread_id = thread["thread_id"]
summary = thread.get("summary", "新对话")
# 判断是否为当前线程
is_current = thread_id == AppState.get_current_thread_id()
# 根据是否当前线程改变按钮样式
btn_type = "primary" if is_current else "tertiary"
# 为了避免内容过长,截断摘要
display_text = summary[:15] + "..." if len(summary) > 15 else summary
if st.button(
display_text,
key=f"thread_{thread_id}",
help=f"完整摘要: {summary}",
use_container_width=True,
type=btn_type
):
_load_thread(thread_id)
def _format_time(time_str: str) -> str:
"""
格式化时间字符串
Args:
time_str: ISO 格式时间字符串
Returns:
格式化后的时间字符串
"""
if not time_str:
return "未知"
try:
dt = datetime.fromisoformat(time_str.replace("Z", "+00:00"))
return dt.strftime("%m-%d %H:%M")
except Exception:
return time_str[:10]
def _refresh_threads():
"""刷新历史线程列表"""
threads = api_client.get_user_threads(AppState.get_user_id())
AppState.set_threads(threads)
def _load_thread(thread_id: str):
"""
加载指定线程的消息历史
Args:
thread_id: 线程 ID
"""
messages = api_client.get_thread_messages(thread_id, AppState.get_user_id())
if messages:
AppState.set_current_thread_id(thread_id)
AppState.clear_messages()
for msg in messages:
AppState.add_message(msg["role"], msg["content"])
st.rerun()
else:
st.error("加载对话失败")

62
frontend/src/config.py Normal file
View File

@@ -0,0 +1,62 @@
"""
前端配置管理模块
集中管理所有配置项,支持环境变量覆盖
"""
import os
from dataclasses import dataclass
from typing import Optional
from dotenv import load_dotenv
# 加载 .env 文件
load_dotenv()
@dataclass
class FrontendConfig:
"""前端配置类 - 统一管理所有配置项"""
# ==================== API 配置 ====================
api_base: str = ""
# ==================== 页面配置 ====================
page_title: str = "AI 个人助手"
page_icon: str = "🤖"
layout: str = "wide"
# ==================== 模型配置 ====================
default_model: str = "local" # 更改为local作为默认模型
model_options: Optional[dict] = None
# ==================== 用户配置 ====================
default_user_id: str = "default_user"
# ==================== 历史记录配置 ====================
history_limit: int = 50
summary_max_length: int = 30
# ==================== 流式响应配置 ====================
stream_timeout: int = 120
def __post_init__(self):
"""初始化后处理 - 设置默认值和加载环境变量"""
if self.model_options is None:
self.model_options = {
"local": "本地 llama.cppGemma-4", # 本地模型作为第一个
"deepseek": "DeepSeek V3.2(在线)", # DeepSeek 作为中间
"zhipu": "智谱 GLM-4.7-Flash在线" # GLM-4.7 作为最后一个
}
# 从环境变量加载配置
self._load_from_env()
def _load_from_env(self):
"""从环境变量加载配置(优先级最高)"""
# API 地址(移除 /chat 后缀)
# 优先级:环境变量 API_URL > 默认值
api_url = os.getenv("API_URL", "http://127.0.0.1:8079")
self.api_base = api_url.replace("/chat", "").rstrip("/")
# 全局配置实例(单例模式)
config = FrontendConfig()

View File

@@ -0,0 +1,126 @@
"""
AI Agent 前端主入口
采用模块化架构,仅负责组装各组件
"""
import sys
import os
# 添加项目根目录到 Python 路径,支持绝对导入
# 现在的结构: frontend/src/frontend_main.py所以要获取 frontend/ 目录作为根
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import streamlit as st
# 使用相对导入
from .config import config
from .state import AppState
from .components.sidebar import render_sidebar
from .components.chat_area import render_chat_area
from .components.info_panel import render_info_panel
# =============================================================================
# 页面配置
# =============================================================================
st.set_page_config(
page_title=config.page_title,
page_icon=config.page_icon,
layout=config.layout
)
# =============================================================================
# 初始化状态
# =============================================================================
AppState.init()
def apply_custom_css():
"""应用自定义CSS样式实现极简风格"""
st.markdown("""
<style>
/* 移除顶部默认空白 */
.block-container {
padding-top: 2rem !important;
padding-bottom: 2rem !important;
}
/* 侧边栏样式优化:降低背景色对比度,稍微暗一点提高区分度 */
[data-testid="stSidebar"] {
background-color: #f0f2f5 !important;
border-right: 1px solid #e1e4e8;
}
/* 隐藏标题和头像边框的粗重线条 */
hr {
margin: 1em 0;
border-color: #eee;
}
/* 自定义按钮样式:去除强烈的背景色,使用浅色线框或扁平风 */
.stButton>button {
border-radius: 8px;
font-weight: 500;
}
/* 覆盖 Primary 按钮默认的刺眼大红色,改为柔和的深色高亮 */
.stButton>button[kind="primary"] {
background-color: #e5e7eb !important;
color: #1f2937 !important;
border: 1px solid #d1d5db !important;
}
/* 覆盖 Primary 按钮悬停效果 */
.stButton>button[kind="primary"]:hover {
background-color: #d1d5db !important;
border-color: #9ca3af !important;
color: #111827 !important;
}
/* 普通按钮悬停效果 */
.stButton>button:hover {
border-color: #9ca3af;
color: #1f2937;
background-color: #f9fafb;
}
/* 聊天输入框美化 */
[data-testid="stChatInput"] {
border-radius: 12px;
border: 1px solid #e0e0e0;
box-shadow: 0 2px 10px rgba(0,0,0,0.03);
}
/* 用户和 AI 的头像调整 */
.stChatMessage {
padding: 1rem 0;
border-bottom: 1px solid #f8f8f8;
}
</style>
""", unsafe_allow_html=True)
# =============================================================================
# 主界面
# =============================================================================
def main():
"""主界面渲染 - 极简宽屏布局"""
# 应用 CSS
apply_custom_css()
# 顶部标题(可选,也可以不放,让界面更像对话框)
st.markdown("<h3 style='text-align: center; font-weight: 400; color: #555; margin-bottom: 2rem;'>个人助手</h3>", unsafe_allow_html=True)
# 左侧边栏:合并用户登录、模型选择和历史对话
with st.sidebar:
render_sidebar()
# 将原本右侧的信息面板简化并移入侧边栏底部
st.divider()
render_info_panel()
# 中间主区域:全宽的聊天区域
render_chat_area()
if __name__ == "__main__":
main()

78
frontend/src/logger.py Normal file
View File

@@ -0,0 +1,78 @@
"""
前端日志模块
基于环境变量控制日志级别,与后端保持一致
"""
import os
import logging
from typing import Any
from dotenv import load_dotenv
# 先加载环境变量
load_dotenv()
# ==================== 日志配置 ====================
# 从环境变量读取日志级别,默认 INFO
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
# 根据环境变量控制是否显示详细调试信息
DEBUG_MODE = os.getenv("DEBUG", "false").lower() == "true"
# 创建统一的日志器
logger = logging.getLogger("ai_agent_frontend")
logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
# 避免重复添加 handler
if not logger.handlers:
handler = logging.StreamHandler()
handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
# ==================== 日志函数 ====================
def debug(msg: Any, *args, **kwargs):
"""
调试日志,仅在 DEBUG 环境变量为 true 时打印
Args:
msg: 日志消息
"""
if DEBUG_MODE:
logger.debug(msg, *args, **kwargs)
def info(msg: Any, *args, **kwargs):
"""
信息日志
Args:
msg: 日志消息
"""
logger.info(msg, *args, **kwargs)
def warning(msg: Any, *args, **kwargs):
"""
警告日志
Args:
msg: 日志消息
"""
logger.warning(msg, *args, **kwargs)
def error(msg: Any, *args, **kwargs):
"""
错误日志
Args:
msg: 日志消息
"""
logger.error(msg, *args, **kwargs)

167
frontend/src/state.py Normal file
View File

@@ -0,0 +1,167 @@
"""
前端状态管理模块
使用 Streamlit Session State 管理应用状态
"""
import uuid
from typing import List, Dict, Any
import streamlit as st
from .config import config
class AppState:
"""应用状态管理器 - 统一管理所有 session_state"""
@staticmethod
def init():
"""初始化所有状态变量"""
# 用户状态
if "user_id" not in st.session_state:
st.session_state.user_id = config.default_user_id
if "logged_in" not in st.session_state:
st.session_state.logged_in = False
# 对话状态
if "current_thread_id" not in st.session_state:
st.session_state.current_thread_id = str(uuid.uuid4())
if "messages" not in st.session_state:
st.session_state.messages = []
# 历史列表
if "threads" not in st.session_state:
st.session_state.threads = []
if "loading_history" not in st.session_state:
st.session_state.loading_history = False
# 模型选择
if "selected_model" not in st.session_state:
st.session_state.selected_model = config.default_model
# ==================== 用户相关 ====================
@staticmethod
def get_user_id() -> str:
"""获取当前用户 ID"""
return st.session_state.user_id
@staticmethod
def is_logged_in() -> bool:
"""检查是否已登录"""
return st.session_state.logged_in
@staticmethod
def login(username: str):
"""
用户登录
Args:
username: 用户名,为空则使用默认用户
"""
st.session_state.user_id = username.strip() if username.strip() else config.default_user_id
st.session_state.logged_in = True
# 登录后必须开启一个干净的新对话
AppState.start_new_thread()
@staticmethod
def logout():
"""用户登出,重置为默认用户"""
st.session_state.logged_in = False
st.session_state.user_id = config.default_user_id
st.session_state.threads = []
# 登出后必须开启一个干净的新对话
AppState.start_new_thread()
# ==================== 线程相关 ====================
@staticmethod
def get_current_thread_id() -> str:
"""获取当前线程 ID"""
return st.session_state.current_thread_id
@staticmethod
def set_current_thread_id(thread_id: str):
"""
设置当前线程 ID
Args:
thread_id: 线程 ID
"""
st.session_state.current_thread_id = thread_id
@staticmethod
def start_new_thread():
"""开始新对话,生成新线程 ID 并清空消息"""
st.session_state.current_thread_id = str(uuid.uuid4())
st.session_state.messages = []
# ==================== 消息相关 ====================
@staticmethod
def get_messages() -> List[Dict[str, str]]:
"""获取消息列表"""
return st.session_state.messages
@staticmethod
def add_message(role: str, content: str):
"""
添加消息
Args:
role: 消息角色 (user/assistant)
content: 消息内容
"""
st.session_state.messages.append({"role": role, "content": content})
@staticmethod
def clear_messages():
"""清空消息列表"""
st.session_state.messages = []
@staticmethod
def get_message_stats() -> Dict[str, int]:
"""
获取消息统计
Returns:
包含 user 和 assistant 消息数量的字典
"""
messages = st.session_state.messages
return {
"user": len([m for m in messages if m["role"] == "user"]),
"assistant": len([m for m in messages if m["role"] == "assistant"])
}
# ==================== 历史列表相关 ====================
@staticmethod
def get_threads() -> List[Dict[str, Any]]:
"""获取历史线程列表"""
return st.session_state.threads
@staticmethod
def set_threads(threads: List[Dict[str, Any]]):
"""
设置历史线程列表
Args:
threads: 线程列表
"""
st.session_state.threads = threads
# ==================== 模型相关 ====================
@staticmethod
def get_selected_model() -> str:
"""获取选中的模型"""
return st.session_state.selected_model
@staticmethod
def set_selected_model(model: str):
"""
设置选中的模型
Args:
model: 模型标识符
"""
st.session_state.selected_model = model

56
frontend/src/utils.py Normal file
View File

@@ -0,0 +1,56 @@
"""
前端工具函数模块
包含通用的辅助函数
"""
from datetime import datetime
from typing import Optional
def format_datetime(dt_str: Optional[str], format: str = "%m-%d %H:%M") -> str:
"""
格式化日期时间字符串
Args:
dt_str: ISO 格式的日期时间字符串
format: 输出格式
Returns:
格式化后的字符串
"""
if not dt_str:
return "未知"
try:
dt = datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
return dt.strftime(format)
except:
return dt_str[:10]
def truncate_text(text: str, max_length: int = 50, suffix: str = "...") -> str:
"""
截断文本
Args:
text: 原始文本
max_length: 最大长度
suffix: 截断后缀
Returns:
截断后的文本
"""
if len(text) <= max_length:
return text
return text[:max_length] + suffix
def generate_thread_id() -> str:
"""
生成新的线程 ID
Returns:
UUID 字符串
"""
import uuid
return str(uuid.uuid4())