feat: 添加子图API端点和前端测试面板,包含确定取消继续交互
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 6m3s

This commit is contained in:
2026-04-25 19:38:22 +08:00
parent 96dc01f8c2
commit 3bc9b19bab
4 changed files with 524 additions and 0 deletions

View File

@@ -350,3 +350,235 @@ if __name__ == "__main__":
# 使用环境变量或默认端口 8079避免与 llama.cpp 的 8081 端口冲突)
port = int(BACKEND_PORT)
uvicorn.run(app, host="0.0.0.0", port=port)
# ==================== 子图专用 API 端点 ====================
# 简化版本,直接调用各个子图,无需完整 agent_service
# 注意:这些是独立测试用的简化端点,方便前端直接调用
@app.get("/subgraph/dictionary/{action}")
async def dictionary_subgraph_api(
action: str,
query: str = "",
user_id: str = "default"
):
"""词典子图简化 API"""
from backend.app.agent_subgraphs.dictionary import (
DictionaryState,
DictionaryAction,
parse_intent,
format_result
)
from backend.app.agent_subgraphs.dictionary.nodes import (
query_word, translate_text, extract_terms, get_daily_word
)
# 创建初始状态
state = DictionaryState(user_query=query, user_id=user_id)
# 处理 action
try:
if action == "query":
state.action = DictionaryAction.QUERY
state.action_params = {"word": query}
state = query_word(state)
elif action == "translate":
state.action = DictionaryAction.TRANSLATE
state.source_text = query
state = translate_text(state)
elif action == "daily":
state.action = DictionaryAction.DAILY_WORD
state = get_daily_word(state)
elif action == "extract":
state.action = DictionaryAction.EXTRACT
state.action_params = {"text": query}
state = extract_terms(state)
else:
# 自动解析意图
state = parse_intent(state)
# 根据解析后的 action 调用
if state.action == DictionaryAction.QUERY:
state = query_word(state)
elif state.action == DictionaryAction.TRANSLATE:
state = translate_text(state)
elif state.action == DictionaryAction.DAILY_WORD:
state = get_daily_word(state)
elif state.action == DictionaryAction.EXTRACT:
state = extract_terms(state)
# 格式化结果
state = format_result(state)
return {
"success": True,
"action": str(state.action),
"result": state.final_result,
"raw_data": {
"word_entry": vars(state.word_entry) if state.word_entry else None,
"translated_text": state.translated_text,
"extracted_terms": [vars(t) for t in state.extracted_terms],
"daily_word": vars(state.daily_word) if state.daily_word else None
}
}
except Exception as e:
return {"success": False, "error": str(e)}
@app.get("/subgraph/news/{action}")
async def news_subgraph_api(
action: str,
query: str = "",
user_id: str = "default"
):
"""资讯子图简化 API"""
from backend.app.agent_subgraphs.news_analysis import (
NewsAnalysisState,
NewsAction,
parse_intent,
format_result
)
from backend.app.agent_subgraphs.news_analysis.nodes import (
query_news, analyze_url, extract_keywords, generate_report
)
# 创建初始状态
state = NewsAnalysisState(user_query=query, user_id=user_id)
# 处理 action
try:
if action == "query":
state.action = NewsAction.QUERY_NEWS
state = query_news(state)
elif action == "analyze":
state.action = NewsAction.ANALYZE_URL
state.custom_urls = [query]
state = analyze_url(state)
elif action == "keywords":
state.action = NewsAction.EXTRACT_KEYWORDS
state = extract_keywords(state)
elif action == "report":
state.action = NewsAction.GENERATE_REPORT
state = generate_report(state)
else:
# 自动解析意图
state = parse_intent(state)
# 根据解析后的 action 调用
if state.action == NewsAction.QUERY_NEWS:
state = query_news(state)
elif state.action == NewsAction.ANALYZE_URL:
state.custom_urls = [query]
state = analyze_url(state)
elif state.action == NewsAction.EXTRACT_KEYWORDS:
state = extract_keywords(state)
elif state.action == NewsAction.GENERATE_REPORT:
state = generate_report(state)
# 格式化结果
state = format_result(state)
return {
"success": True,
"action": str(state.action),
"result": state.final_result,
"raw_data": {
"news_items": [vars(item) for item in state.news_items],
"extracted_keywords": state.extracted_keywords,
"report_content": state.report_content
}
}
except Exception as e:
return {"success": False, "error": str(e)}
@app.get("/subgraph/contact/{action}")
async def contact_subgraph_api(
action: str,
query: str = "",
user_id: str = "default"
):
"""通讯录子图简化 API"""
from backend.app.agent_subgraphs.contact import (
ContactState,
ContactAction,
parse_intent,
format_result
)
from backend.app.agent_subgraphs.contact.nodes import (
list_contacts, add_contact, list_emails, generate_email_draft, sniff_contacts
)
# 创建初始状态
state = ContactState(user_query=query, user_id=user_id)
# 处理 action
try:
if action == "list":
state.action = ContactAction.CONTACT_LIST
state = list_contacts(state)
elif action == "add":
state.action = ContactAction.CONTACT_ADD
state = add_contact(state)
elif action == "emails":
state.action = ContactAction.EMAIL_LIST
state = list_emails(state)
elif action == "draft":
state.action = ContactAction.EMAIL_SEND
state = generate_email_draft(state)
elif action == "sniff":
state.action = ContactAction.SNIFF_CONTACTS
state = sniff_contacts(state)
else:
# 自动解析意图
state = parse_intent(state)
# 根据解析后的 action 调用
if state.action == ContactAction.CONTACT_LIST:
state = list_contacts(state)
elif state.action == ContactAction.CONTACT_ADD:
state = add_contact(state)
elif state.action == ContactAction.EMAIL_LIST:
state = list_emails(state)
elif state.action == ContactAction.EMAIL_SEND:
state = generate_email_draft(state)
elif state.action == ContactAction.SNIFF_CONTACTS:
state = sniff_contacts(state)
# 格式化结果
state = format_result(state)
return {
"success": True,
"action": str(state.action),
"result": state.final_result,
"raw_data": {
"contacts": [vars(c) for c in state.contacts],
"emails": [vars(e) for e in state.emails],
"current_contact": vars(state.current_contact) if state.current_contact else None,
"draft": {
"subject": state.draft_subject,
"recipient": state.draft_recipient,
"body": state.draft_body
},
"sniffed": [vars(c) for c in state.sniffed_contacts]
}
}
except Exception as e:
return {"success": False, "error": str(e)}
@app.get("/subgraph/help")
async def subgraph_help_api():
"""子图 API 使用帮助"""
return {
"dictionary": {
"actions": ["query", "translate", "daily", "extract", "auto"],
"endpoint": "/subgraph/dictionary/{action}"
},
"news": {
"actions": ["query", "analyze", "keywords", "report", "auto"],
"endpoint": "/subgraph/news/{action}"
},
"contact": {
"actions": ["list", "add", "emails", "draft", "sniff", "auto"],
"endpoint": "/subgraph/contact/{action}"
}
}

View File

@@ -379,6 +379,70 @@ class APIClient:
error(f"请求审核异常: {e}")
return ""
# ==================== 子图专用 API ====================
def call_dictionary_subgraph(self, action: str, query: str = "", user_id: str = "default") -> dict:
"""调用词典子图"""
try:
resp = requests.get(
f"{self.base_url}/subgraph/dictionary/{action}",
params={"query": query, "user_id": user_id},
timeout=10
)
if resp.status_code == 200:
return resp.json()
else:
warning(f"调用词典子图失败: HTTP {resp.status_code}")
return {"success": False, "error": "HTTP错误"}
except Exception as e:
error(f"调用词典子图异常: {e}")
return {"success": False, "error": str(e)}
def call_news_subgraph(self, action: str, query: str = "", user_id: str = "default") -> dict:
"""调用资讯子图"""
try:
resp = requests.get(
f"{self.base_url}/subgraph/news/{action}",
params={"query": query, "user_id": user_id},
timeout=10
)
if resp.status_code == 200:
return resp.json()
else:
warning(f"调用资讯子图失败: HTTP {resp.status_code}")
return {"success": False, "error": "HTTP错误"}
except Exception as e:
error(f"调用资讯子图异常: {e}")
return {"success": False, "error": str(e)}
def call_contact_subgraph(self, action: str, query: str = "", user_id: str = "default") -> dict:
"""调用通讯录子图"""
try:
resp = requests.get(
f"{self.base_url}/subgraph/contact/{action}",
params={"query": query, "user_id": user_id},
timeout=10
)
if resp.status_code == 200:
return resp.json()
else:
warning(f"调用通讯录子图失败: HTTP {resp.status_code}")
return {"success": False, "error": "HTTP错误"}
except Exception as e:
error(f"调用通讯录子图异常: {e}")
return {"success": False, "error": str(e)}
def get_subgraph_help(self) -> dict:
"""获取子图 API 使用帮助"""
try:
resp = requests.get(f"{self.base_url}/subgraph/help", timeout=5)
if resp.status_code == 200:
return resp.json()
else:
return {}
except Exception as e:
return {}
# 全局 API 客户端实例(单例模式)
api_client = APIClient()

View File

@@ -0,0 +1,222 @@
"""
子图测试面板组件
包含词典、资讯、通讯录三个子图的测试界面
使用确定取消继续交互
"""
import streamlit as st
from api_client import api_client
from state import AppState
def render_subgraph_panel():
"""渲染子图测试面板"""
st.markdown("## 🔧 子图测试面板")
with st.expander("📚 词典子图", expanded=False):
_render_dictionary_panel()
with st.expander("📰 资讯子图", expanded=False):
_render_news_panel()
with st.expander("📇 通讯录子图", expanded=False):
_render_contact_panel()
def _render_dictionary_panel():
"""渲染词典子图测试面板"""
# 会话状态管理
if "dict_action" not in st.session_state:
st.session_state.dict_action = "auto"
if "dict_query" not in st.session_state:
st.session_state.dict_query = ""
if "dict_result" not in st.session_state:
st.session_state.dict_result = None
if "dict_confirm" not in st.session_state:
st.session_state.dict_confirm = False
# 选择 Action
action_col1, action_col2 = st.columns([1, 2])
with action_col1:
action = st.selectbox(
"操作",
options=["auto", "query", "translate", "daily", "extract"],
index=["auto", "query", "translate", "daily", "extract"].index(st.session_state.dict_action),
key="dict_action_selector"
)
with action_col2:
query = st.text_input("查询内容", value=st.session_state.dict_query, key="dict_query_input")
# 按钮行
btn_col1, btn_col2, btn_col3 = st.columns([1, 1, 1])
with btn_col1:
if st.button("▶️ 运行", key="dict_run", use_container_width=True):
st.session_state.dict_action = action
st.session_state.dict_query = query
with st.spinner("调用子图中..."):
result = api_client.call_dictionary_subgraph(action, query)
st.session_state.dict_result = result
st.session_state.dict_confirm = False
st.rerun()
# 显示结果
if st.session_state.dict_result:
result = st.session_state.dict_result
if result.get("success"):
st.success("✅ 调用成功")
st.markdown("### 结果")
st.markdown(result.get("result", ""))
# 确定/取消/继续按钮
confirm_col1, confirm_col2, confirm_col3 = st.columns([1, 1, 1])
with confirm_col1:
if st.button("✅ 确定", key="dict_confirm", use_container_width=True):
st.session_state.dict_confirm = True
st.info("已确认结果")
with confirm_col2:
if st.button("❌ 取消", key="dict_cancel", use_container_width=True):
st.session_state.dict_result = None
st.rerun()
with confirm_col3:
if st.button("⏭️ 继续", key="dict_continue", use_container_width=True):
st.session_state.dict_confirm = True
st.info("已继续")
# 显示原始数据(可选)
with st.expander("🔍 原始数据", expanded=False):
st.json(result.get("raw_data", {}))
else:
st.error(f"❌ 调用失败: {result.get('error')}")
def _render_news_panel():
"""渲染资讯子图测试面板"""
# 会话状态管理
if "news_action" not in st.session_state:
st.session_state.news_action = "auto"
if "news_query" not in st.session_state:
st.session_state.news_query = ""
if "news_result" not in st.session_state:
st.session_state.news_result = None
if "news_confirm" not in st.session_state:
st.session_state.news_confirm = False
# 选择 Action
action_col1, action_col2 = st.columns([1, 2])
with action_col1:
action = st.selectbox(
"操作",
options=["auto", "query", "analyze", "keywords", "report"],
index=["auto", "query", "analyze", "keywords", "report"].index(st.session_state.news_action),
key="news_action_selector"
)
with action_col2:
query = st.text_input("查询内容/URL", value=st.session_state.news_query, key="news_query_input")
# 按钮行
btn_col1, btn_col2, btn_col3 = st.columns([1, 1, 1])
with btn_col1:
if st.button("▶️ 运行", key="news_run", use_container_width=True):
st.session_state.news_action = action
st.session_state.news_query = query
with st.spinner("调用子图中..."):
result = api_client.call_news_subgraph(action, query)
st.session_state.news_result = result
st.session_state.news_confirm = False
st.rerun()
# 显示结果
if st.session_state.news_result:
result = st.session_state.news_result
if result.get("success"):
st.success("✅ 调用成功")
st.markdown("### 结果")
st.markdown(result.get("result", ""))
# 确定/取消/继续按钮
confirm_col1, confirm_col2, confirm_col3 = st.columns([1, 1, 1])
with confirm_col1:
if st.button("✅ 确定", key="news_confirm", use_container_width=True):
st.session_state.news_confirm = True
st.info("已确认结果")
with confirm_col2:
if st.button("❌ 取消", key="news_cancel", use_container_width=True):
st.session_state.news_result = None
st.rerun()
with confirm_col3:
if st.button("⏭️ 继续", key="news_continue", use_container_width=True):
st.session_state.news_confirm = True
st.info("已继续")
# 显示原始数据(可选)
with st.expander("🔍 原始数据", expanded=False):
st.json(result.get("raw_data", {}))
else:
st.error(f"❌ 调用失败: {result.get('error')}")
def _render_contact_panel():
"""渲染通讯录子图测试面板"""
# 会话状态管理
if "contact_action" not in st.session_state:
st.session_state.contact_action = "auto"
if "contact_query" not in st.session_state:
st.session_state.contact_query = ""
if "contact_result" not in st.session_state:
st.session_state.contact_result = None
if "contact_confirm" not in st.session_state:
st.session_state.contact_confirm = False
# 选择 Action
action_col1, action_col2 = st.columns([1, 2])
with action_col1:
action = st.selectbox(
"操作",
options=["auto", "list", "add", "emails", "draft", "sniff"],
index=["auto", "list", "add", "emails", "draft", "sniff"].index(st.session_state.contact_action),
key="contact_action_selector"
)
with action_col2:
query = st.text_input("查询内容", value=st.session_state.contact_query, key="contact_query_input")
# 按钮行
btn_col1, btn_col2, btn_col3 = st.columns([1, 1, 1])
with btn_col1:
if st.button("▶️ 运行", key="contact_run", use_container_width=True):
st.session_state.contact_action = action
st.session_state.contact_query = query
with st.spinner("调用子图中..."):
result = api_client.call_contact_subgraph(action, query)
st.session_state.contact_result = result
st.session_state.contact_confirm = False
st.rerun()
# 显示结果
if st.session_state.contact_result:
result = st.session_state.contact_result
if result.get("success"):
st.success("✅ 调用成功")
st.markdown("### 结果")
st.markdown(result.get("result", ""))
# 确定/取消/继续按钮
confirm_col1, confirm_col2, confirm_col3 = st.columns([1, 1, 1])
with confirm_col1:
if st.button("✅ 确定", key="contact_confirm", use_container_width=True):
st.session_state.contact_confirm = True
st.info("已确认结果")
with confirm_col2:
if st.button("❌ 取消", key="contact_cancel", use_container_width=True):
st.session_state.contact_result = None
st.rerun()
with confirm_col3:
if st.button("⏭️ 继续", key="contact_continue", use_container_width=True):
st.session_state.contact_confirm = True
st.info("已继续")
# 显示原始数据(可选)
with st.expander("🔍 原始数据", expanded=False):
st.json(result.get("raw_data", {}))
else:
st.error(f"❌ 调用失败: {result.get('error')}")

View File

@@ -19,12 +19,14 @@ if __name__ == '__main__':
from components.sidebar import render_sidebar
from components.chat_area import render_chat_area
from components.info_panel import render_info_panel
from components.subgraph_panel import render_subgraph_panel
else:
from .config import config
from .state import AppState
from .components.sidebar import render_sidebar
from .components.chat_area import render_chat_area
from .components.info_panel import render_info_panel
from .components.subgraph_panel import render_subgraph_panel
# =============================================================================
@@ -128,6 +130,10 @@ def main():
# 中间主区域:全宽的聊天区域
render_chat_area()
# 底部:子图测试面板
st.divider()
render_subgraph_panel()
if __name__ == "__main__":
main()