239 lines
8.1 KiB
Python
239 lines
8.1 KiB
Python
"""
|
|
格式化输出工具模块
|
|
|
|
提供统一的 Markdown 格式化能力:
|
|
1. MarkdownFormatter - 静态方法生成 Markdown 元素
|
|
2. OutputRenderer - 模板渲染 + 全局单例
|
|
3. get_formatter() - 获取全局单例
|
|
"""
|
|
|
|
from typing import Dict, List, Any, Optional
|
|
from pathlib import Path
|
|
|
|
from backend.app.logger import info, warning
|
|
from backend.app.templates import TEMPLATES_DIR
|
|
|
|
|
|
# ========== Markdown 格式化器 ==========
|
|
|
|
class MarkdownFormatter:
|
|
"""Markdown 格式化工具(静态方法)"""
|
|
|
|
@staticmethod
|
|
def table(data: List[Dict[str, Any]], headers: Optional[List[str]] = None) -> str:
|
|
"""生成 Markdown 表格"""
|
|
if not data:
|
|
return ""
|
|
|
|
if headers is None:
|
|
headers = list(data[0].keys()) if data else []
|
|
|
|
if not headers:
|
|
return ""
|
|
|
|
lines = []
|
|
header_line = "| " + " | ".join(str(h) for h in headers) + " |"
|
|
lines.append(header_line)
|
|
separator_line = "| " + " | ".join("---" for _ in headers) + " |"
|
|
lines.append(separator_line)
|
|
|
|
for row in data:
|
|
row_values = [str(row.get(h, "")) for h in headers]
|
|
lines.append("| " + " | ".join(row_values) + " |")
|
|
|
|
return "\n".join(lines)
|
|
|
|
@staticmethod
|
|
def bullet_list(items: List[str], indent: int = 0) -> str:
|
|
"""生成无序列表"""
|
|
indent_str = " " * indent
|
|
return "\n".join(f"{indent_str}- {item}" for item in items)
|
|
|
|
@staticmethod
|
|
def numbered_list(items: List[str], start: int = 1, indent: int = 0) -> str:
|
|
"""生成有序列表"""
|
|
indent_str = " " * indent
|
|
return "\n".join(f"{indent_str}{i}. {item}" for i, item in enumerate(items, start=start))
|
|
|
|
@staticmethod
|
|
def quote(text: str, author: Optional[str] = None) -> str:
|
|
"""生成引用块"""
|
|
quoted = "\n".join(f"> {line}" for line in text.split("\n"))
|
|
if author:
|
|
quoted += f"\n> — {author}"
|
|
return quoted
|
|
|
|
@staticmethod
|
|
def code(code: str, language: str = "") -> str:
|
|
"""生成代码块"""
|
|
return f"```{language}\n{code}\n```"
|
|
|
|
@staticmethod
|
|
def heading(text: str, level: int = 1) -> str:
|
|
"""生成标题"""
|
|
level = max(1, min(6, level))
|
|
return f"{'#' * level} {text}"
|
|
|
|
@staticmethod
|
|
def link(text: str, url: str) -> str:
|
|
"""生成链接"""
|
|
return f"[{text}]({url})"
|
|
|
|
@staticmethod
|
|
def bold(text: str) -> str:
|
|
"""生成粗体"""
|
|
return f"**{text}**"
|
|
|
|
@staticmethod
|
|
def italic(text: str) -> str:
|
|
"""生成斜体"""
|
|
return f"*{text}*"
|
|
|
|
@staticmethod
|
|
def divider() -> str:
|
|
"""生成分割线"""
|
|
return "---"
|
|
|
|
@staticmethod
|
|
def format(data: Any) -> str:
|
|
"""根据数据类型自动选择格式化方式"""
|
|
if isinstance(data, list):
|
|
if data and isinstance(data[0], dict):
|
|
return MarkdownFormatter.table(data)
|
|
return MarkdownFormatter.bullet_list([str(item) for item in data])
|
|
elif isinstance(data, dict):
|
|
return MarkdownFormatter.table([data])
|
|
return str(data)
|
|
|
|
|
|
# ========== 模板加载器 ==========
|
|
|
|
class TemplateManager:
|
|
"""模板管理器,支持从文件加载 .md 或 .jinja 模板"""
|
|
|
|
def __init__(self, template_dir: Optional[Path] = None):
|
|
self.template_dir = template_dir or TEMPLATES_DIR
|
|
self._templates: Dict[str, str] = {}
|
|
self._load_templates()
|
|
|
|
def _load_templates(self) -> None:
|
|
"""从模板目录加载所有 .md 和 .jinja 文件"""
|
|
if not self.template_dir.exists():
|
|
warning(f"[Formatter] 模板目录不存在: {self.template_dir}")
|
|
return
|
|
|
|
for tmpl_file in self.template_dir.glob("*.md"):
|
|
name = tmpl_file.stem
|
|
self._templates[name] = tmpl_file.read_text(encoding="utf-8")
|
|
info(f"[Formatter] 加载模板: {name}")
|
|
|
|
for tmpl_file in self.template_dir.glob("*.jinja"):
|
|
name = tmpl_file.stem
|
|
self._templates[name] = tmpl_file.read_text(encoding="utf-8")
|
|
info(f"[Formatter] 加载模板: {name}")
|
|
|
|
def get(self, name: str) -> Optional[str]:
|
|
"""获取模板内容"""
|
|
return self._templates.get(name)
|
|
|
|
def render(self, name: str, context: Dict[str, Any]) -> str:
|
|
"""渲染模板"""
|
|
template = self.get(name)
|
|
if not template:
|
|
raise ValueError(f"模板不存在: {name}")
|
|
|
|
return self._render_string(template, context)
|
|
|
|
def _render_string(self, template_str: str, context: Dict[str, Any]) -> str:
|
|
"""渲染模板字符串"""
|
|
# 尝试 Jinja2
|
|
try:
|
|
from jinja2 import Template
|
|
tmpl = Template(template_str)
|
|
return tmpl.render(**context)
|
|
except ImportError:
|
|
pass
|
|
|
|
# 简单替换作为兜底
|
|
result = template_str
|
|
for key, value in context.items():
|
|
str_val = str(value)
|
|
result = result.replace(f"{{{{ {key} }}}}", str_val)
|
|
result = result.replace(f"{{{{{key}}}}}", str_val)
|
|
# Handle conditional blocks {{#if key}}...{{/if}}
|
|
import re
|
|
result = re.sub(r"\{\{#if\s+" + re.escape(key) + r"\}\}(.*?)\{\{/if\}\}",
|
|
str_val and r"\1" or "", result, flags=re.DOTALL)
|
|
# Handle each loops {{#each key}}...{{/each}}
|
|
if isinstance(value, list):
|
|
pattern = r"\{\{#each\s+" + re.escape(key) + r"\}\}(.*?)\{\{/each\}\}"
|
|
matches = re.findall(pattern, result, flags=re.DOTALL)
|
|
for match in matches:
|
|
rendered_items = []
|
|
for idx, item in enumerate(value):
|
|
item_str = match
|
|
if isinstance(item, dict):
|
|
for k, v in item.items():
|
|
item_str = item_str.replace("{{ " + k + " }}", str(v))
|
|
item_str = item_str.replace("{{" + k + "}}", str(v))
|
|
item_str = item_str.replace("{{ @" + k + " }}", str(idx + 1))
|
|
item_str = item_str.replace("{{ @index }}", str(idx + 1))
|
|
else:
|
|
item_str = item_str.replace("{{ this }}", str(item))
|
|
rendered_items.append(item_str)
|
|
result = re.sub(pattern, "\n".join(rendered_items), result, flags=re.DOTALL)
|
|
return result
|
|
|
|
|
|
# ========== 输出渲染器 ==========
|
|
|
|
class OutputRenderer:
|
|
"""
|
|
输出渲染器 - 统一接口渲染模板
|
|
使用全局单例,通过 get_formatter() 获取
|
|
"""
|
|
|
|
def __init__(self, template_dir: Optional[Path] = None):
|
|
self._templates = TemplateManager(template_dir)
|
|
self.md = MarkdownFormatter()
|
|
|
|
def render(self, template_name: str, **context) -> str:
|
|
"""
|
|
渲染模板
|
|
|
|
Args:
|
|
template_name: 模板名称(不含扩展名)
|
|
**context: 渲染上下文
|
|
"""
|
|
return self._templates.render(template_name, context)
|
|
|
|
def render_plain(self, data: Any) -> str:
|
|
"""直接格式化数据为 Markdown"""
|
|
return self.md.format(data)
|
|
|
|
def render_error(self, error_type: str, error_message: str = "",
|
|
suggestions: Optional[List[str]] = None,
|
|
retry_count: int = 0, max_retries: Optional[int] = None) -> str:
|
|
"""渲染错误通知模板"""
|
|
context = {
|
|
"error_type": error_type,
|
|
"error_message": error_message,
|
|
"suggestions": self.md.bullet_list(suggestions or ["请稍后重试"]),
|
|
"retry_count": retry_count,
|
|
"max_retries": max_retries,
|
|
}
|
|
return self.render("error_notification", **context)
|
|
|
|
|
|
# ========== 全局单例 ==========
|
|
|
|
_formatter: Optional[OutputRenderer] = None
|
|
|
|
|
|
def get_formatter() -> OutputRenderer:
|
|
"""获取全局 OutputRenderer 单例"""
|
|
global _formatter
|
|
if _formatter is None:
|
|
_formatter = OutputRenderer()
|
|
return _formatter
|