Files
ailine/backend/app/core/stream_finalizer.py
root 4c119073bc
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 6m6s
优化输出
2026-05-09 01:51:18 +08:00

221 lines
6.9 KiB
Python

"""
流式输出格式化器
在流式输出结束后,追加格式化结构(表格、引用等)
解决流式输出与模板渲染的冲突
"""
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from backend.app.core.formatter import get_formatter
@dataclass
class StreamAppend:
"""流式追加内容"""
type: str # "table" | "quote" | "list" | "divider" | "text"
content: Any
class StreamFinalizer:
"""
流式输出格式化器
在流式输出结束后追加结构化内容:
- 表格
- 引用块
- 分割线
- 文本
使用方式:
1. 流式输出主体内容
2. 调用 build_append() 获取追加内容
3. 发送追加事件到前端
"""
def __init__(self):
self.formatter = get_formatter()
self.md = self.formatter.md
self._appends: List[StreamAppend] = []
def reset(self):
"""重置追加队列"""
self._appends = []
return self
def add_table(self, data: List[Dict], headers: Optional[List[str]] = None):
"""添加表格"""
self._appends.append(StreamAppend(
type="table",
content={"data": data, "headers": headers}
))
return self
def add_quote(self, text: str, author: Optional[str] = None):
"""添加引用块"""
self._appends.append(StreamAppend(
type="quote",
content={"text": text, "author": author}
))
return self
def add_list(self, items: List[str], numbered: bool = False):
"""添加列表"""
self._appends.append(StreamAppend(
type="list",
content={"items": items, "numbered": numbered}
))
return self
def add_divider(self):
"""添加分割线"""
self._appends.append(StreamAppend(type="divider", content=None))
return self
def add_text(self, text: str):
"""添加文本"""
self._appends.append(StreamAppend(type="text", content=text))
return self
def add_knowledge_summary(self, topic: str, summary: str,
key_points: Optional[List[Dict]] = None,
table_data: Optional[List[Dict]] = None,
sources: Optional[List[Dict]] = None):
"""添加知识总结(使用模板)"""
table = ""
if table_data:
table = self.md.table(table_data)
self._appends.append(StreamAppend(
type="template",
content={
"name": "knowledge_summary",
"context": {
"topic": topic,
"timestamp": self._now(),
"summary": summary,
"key_points": key_points or [],
"table_data": table_data,
"table": table,
"sources": sources or [],
}
}
))
return self
def add_web_results(self, query: str, results: List[Dict]):
"""添加搜索结果"""
self._appends.append(StreamAppend(
type="template",
content={
"name": "web_search_result",
"context": {
"query": query,
"result_count": len(results),
"results": results,
}
}
))
return self
def build_append(self) -> str:
"""
构建追加内容
Returns:
格式化后的追加文本
"""
if not self._appends:
return ""
lines = []
lines.append("") # 空行分隔
lines.append(self.md.divider())
lines.append("")
for append in self._appends:
if append.type == "table":
lines.append(self.md.table(append.content["data"], append.content.get("headers")))
elif append.type == "quote":
lines.append(self.md.quote(append.content["text"], append.content.get("author")))
elif append.type == "list":
if append.content["numbered"]:
lines.append(self.md.numbered_list(append.content["items"]))
else:
lines.append(self.md.bullet_list(append.content["items"]))
elif append.type == "divider":
lines.append(self.md.divider())
elif append.type == "text":
lines.append(append.content)
elif append.type == "template":
template_name = append.content["name"]
context = append.content["context"]
lines.append(self.formatter.render(template_name, **context))
lines.append("")
return "\n".join(lines)
def build_events(self) -> List[Dict[str, Any]]:
"""
构建追加事件列表(用于流式发送)
Returns:
事件列表,每项包含 type 和 content
"""
if not self._appends:
return []
events = []
for append in self._appends:
if append.type == "table":
events.append({
"type": "append_table",
"content": self.md.table(append.content["data"], append.content.get("headers"))
})
elif append.type == "quote":
events.append({
"type": "append_quote",
"content": self.md.quote(append.content["text"], append.content.get("author"))
})
elif append.type == "list":
events.append({
"type": "append_list",
"content": self.md.numbered_list(append.content["items"]) if append.content["numbered"]
else self.md.bullet_list(append.content["items"])
})
elif append.type == "divider":
events.append({
"type": "append_divider",
"content": self.md.divider()
})
elif append.type == "text":
events.append({
"type": "append_text",
"content": append.content
})
elif append.type == "template":
template_name = append.content["name"]
context = append.content["context"]
events.append({
"type": "append_template",
"template": template_name,
"content": self.formatter.render(template_name, **context)
})
return events
@staticmethod
def _now() -> str:
"""获取当前时间"""
from datetime import datetime
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# ========== 便捷函数 ==========
def create_finalizer() -> StreamFinalizer:
"""创建流式格式化器"""
return StreamFinalizer()