""" 格式化输出工具模块 提供统一的 Markdown 格式化能力: 1. MarkdownFormatter - 静态方法生成 Markdown 元素 2. OutputRenderer - 模板渲染 + 全局单例 3. get_formatter() - 获取全局单例 """ from typing import Dict, List, Any, Optional from pathlib import Path from backend.app.logger import info, warning from backend.app.templates import TEMPLATES_DIR # ========== Markdown 格式化器 ========== class MarkdownFormatter: """Markdown 格式化工具(静态方法)""" @staticmethod def table(data: List[Dict[str, Any]], headers: Optional[List[str]] = None) -> str: """生成 Markdown 表格""" if not data: return "" if headers is None: headers = list(data[0].keys()) if data else [] if not headers: return "" lines = [] header_line = "| " + " | ".join(str(h) for h in headers) + " |" lines.append(header_line) separator_line = "| " + " | ".join("---" for _ in headers) + " |" lines.append(separator_line) for row in data: row_values = [str(row.get(h, "")) for h in headers] lines.append("| " + " | ".join(row_values) + " |") return "\n".join(lines) @staticmethod def bullet_list(items: List[str], indent: int = 0) -> str: """生成无序列表""" indent_str = " " * indent return "\n".join(f"{indent_str}- {item}" for item in items) @staticmethod def numbered_list(items: List[str], start: int = 1, indent: int = 0) -> str: """生成有序列表""" indent_str = " " * indent return "\n".join(f"{indent_str}{i}. {item}" for i, item in enumerate(items, start=start)) @staticmethod def quote(text: str, author: Optional[str] = None) -> str: """生成引用块""" quoted = "\n".join(f"> {line}" for line in text.split("\n")) if author: quoted += f"\n> — {author}" return quoted @staticmethod def code(code: str, language: str = "") -> str: """生成代码块""" return f"```{language}\n{code}\n```" @staticmethod def heading(text: str, level: int = 1) -> str: """生成标题""" level = max(1, min(6, level)) return f"{'#' * level} {text}" @staticmethod def link(text: str, url: str) -> str: """生成链接""" return f"[{text}]({url})" @staticmethod def bold(text: str) -> str: """生成粗体""" return f"**{text}**" @staticmethod def italic(text: str) -> str: """生成斜体""" return f"*{text}*" @staticmethod def divider() -> str: """生成分割线""" return "---" @staticmethod def format(data: Any) -> str: """根据数据类型自动选择格式化方式""" if isinstance(data, list): if data and isinstance(data[0], dict): return MarkdownFormatter.table(data) return MarkdownFormatter.bullet_list([str(item) for item in data]) elif isinstance(data, dict): return MarkdownFormatter.table([data]) return str(data) # ========== 模板加载器 ========== class TemplateManager: """模板管理器,支持从文件加载 .md 或 .jinja 模板""" def __init__(self, template_dir: Optional[Path] = None): self.template_dir = template_dir or TEMPLATES_DIR self._templates: Dict[str, str] = {} self._load_templates() def _load_templates(self) -> None: """从模板目录加载所有 .md 和 .jinja 文件""" if not self.template_dir.exists(): warning(f"[Formatter] 模板目录不存在: {self.template_dir}") return for tmpl_file in self.template_dir.glob("*.md"): name = tmpl_file.stem self._templates[name] = tmpl_file.read_text(encoding="utf-8") info(f"[Formatter] 加载模板: {name}") for tmpl_file in self.template_dir.glob("*.jinja"): name = tmpl_file.stem self._templates[name] = tmpl_file.read_text(encoding="utf-8") info(f"[Formatter] 加载模板: {name}") def get(self, name: str) -> Optional[str]: """获取模板内容""" return self._templates.get(name) def render(self, name: str, context: Dict[str, Any]) -> str: """渲染模板""" template = self.get(name) if not template: raise ValueError(f"模板不存在: {name}") return self._render_string(template, context) def _render_string(self, template_str: str, context: Dict[str, Any]) -> str: """渲染模板字符串""" # 尝试 Jinja2 try: from jinja2 import Template tmpl = Template(template_str) return tmpl.render(**context) except ImportError: pass # 简单替换作为兜底 result = template_str for key, value in context.items(): str_val = str(value) result = result.replace(f"{{{{ {key} }}}}", str_val) result = result.replace(f"{{{{{key}}}}}", str_val) # Handle conditional blocks {{#if key}}...{{/if}} import re result = re.sub(r"\{\{#if\s+" + re.escape(key) + r"\}\}(.*?)\{\{/if\}\}", str_val and r"\1" or "", result, flags=re.DOTALL) # Handle each loops {{#each key}}...{{/each}} if isinstance(value, list): pattern = r"\{\{#each\s+" + re.escape(key) + r"\}\}(.*?)\{\{/each\}\}" matches = re.findall(pattern, result, flags=re.DOTALL) for match in matches: rendered_items = [] for idx, item in enumerate(value): item_str = match if isinstance(item, dict): for k, v in item.items(): item_str = item_str.replace("{{ " + k + " }}", str(v)) item_str = item_str.replace("{{" + k + "}}", str(v)) item_str = item_str.replace("{{ @" + k + " }}", str(idx + 1)) item_str = item_str.replace("{{ @index }}", str(idx + 1)) else: item_str = item_str.replace("{{ this }}", str(item)) rendered_items.append(item_str) result = re.sub(pattern, "\n".join(rendered_items), result, flags=re.DOTALL) return result # ========== 输出渲染器 ========== class OutputRenderer: """ 输出渲染器 - 统一接口渲染模板 使用全局单例,通过 get_formatter() 获取 """ def __init__(self, template_dir: Optional[Path] = None): self._templates = TemplateManager(template_dir) self.md = MarkdownFormatter() def render(self, template_name: str, **context) -> str: """ 渲染模板 Args: template_name: 模板名称(不含扩展名) **context: 渲染上下文 """ return self._templates.render(template_name, context) def render_plain(self, data: Any) -> str: """直接格式化数据为 Markdown""" return self.md.format(data) def render_error(self, error_type: str, error_message: str = "", suggestions: Optional[List[str]] = None, retry_count: int = 0, max_retries: Optional[int] = None) -> str: """渲染错误通知模板""" context = { "error_type": error_type, "error_message": error_message, "suggestions": self.md.bullet_list(suggestions or ["请稍后重试"]), "retry_count": retry_count, "max_retries": max_retries, } return self.render("error_notification", **context) # ========== 全局单例 ========== _formatter: Optional[OutputRenderer] = None def get_formatter() -> OutputRenderer: """获取全局 OutputRenderer 单例""" global _formatter if _formatter is None: _formatter = OutputRenderer() return _formatter