feat: 实现完整的人工审核功能与子图模块
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 6m9s

- 新增三个核心子图:人工审核、意图理解、格式化输出
- 实现完整的审核 API 端点(/api/review/*)
- 前端添加审核确认界面(右下角固定框)
- 为每个子图创建分步测试代码
- 添加功能实现文档
This commit is contained in:
2026-04-25 13:10:31 +08:00
parent 851d52ed8d
commit 1038b5fb29
9 changed files with 1981 additions and 4 deletions

View File

@@ -0,0 +1,60 @@
"""
公共工具模块
提供可复用的基础组件
导出:
- formatter: 格式化输出工具
- intent: 意图理解工具
- human_review: 人工审核工具
"""
from .formatter import (
MarkdownFormatter,
TemplateManager,
OutputRenderer,
PresetTemplates
)
from .intent import (
IntentType,
Intent,
Entity,
IntentParser,
RuleBasedIntentClassifier,
RuleBasedEntityExtractor,
IntentRegistry,
create_default_intent_parser
)
from .human_review import (
ReviewStatus,
HumanReview,
HumanReviewStore,
InMemoryReviewStore,
HumanReviewNode,
ReviewManager
)
__all__ = [
# formatter
"MarkdownFormatter",
"TemplateManager",
"OutputRenderer",
"PresetTemplates",
# intent
"IntentType",
"Intent",
"Entity",
"IntentParser",
"RuleBasedIntentClassifier",
"RuleBasedEntityExtractor",
"IntentRegistry",
"create_default_intent_parser",
# human_review
"ReviewStatus",
"HumanReview",
"HumanReviewStore",
"InMemoryReviewStore",
"HumanReviewNode",
"ReviewManager"
]

View File

@@ -0,0 +1,481 @@
"""
格式化输出工具模块
提供基于 Jinja2 模板的 Markdown 格式化输出能力
功能:
1. TemplateManager - 模板管理器,支持加载和渲染 Jinja2 模板
2. MarkdownFormatter - Markdown 格式化工具,提供常用格式(表格、列表、引用等)
3. OutputRenderer - 输出渲染器,统一接口生成最终输出
4. PresetTemplates - 预置模板(对话摘要、报告、列表等)
"""
import os
from pathlib import Path
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass
from abc import ABC, abstractmethod
# 尝试导入 Jinja2如果没有则提供基础实现
try:
from jinja2 import Template as JinjaTemplate, Environment, BaseLoader
HAS_JINJA2 = True
except ImportError:
HAS_JINJA2 = False
class BaseFormatter(ABC):
"""格式化器基类"""
@abstractmethod
def format(self, data: Any) -> str:
"""格式化数据为字符串"""
pass
class MarkdownFormatter(BaseFormatter):
"""Markdown 格式化工具"""
@staticmethod
def table(data: List[Dict[str, Any]], headers: Optional[List[str]] = None) -> str:
"""
生成 Markdown 表格
Args:
data: 数据列表,每个元素是一个字典
headers: 表头列表,如果为 None 则使用字典的键
Returns:
Markdown 表格字符串
"""
if not data:
return ""
if headers is None:
headers = list(data[0].keys()) if data else []
if not headers:
return ""
lines = []
# 表头行
header_line = "| " + " | ".join(str(h) for h in headers) + " |"
lines.append(header_line)
# 分隔线
separator_line = "| " + " | ".join("---" for _ in headers) + " |"
lines.append(separator_line)
# 数据行
for row in data:
row_values = [str(row.get(h, "")) for h in headers]
row_line = "| " + " | ".join(row_values) + " |"
lines.append(row_line)
return "\n".join(lines)
@staticmethod
def bullet_list(items: List[str], indent: int = 0) -> str:
"""
生成无序列表
Args:
items: 列表项
indent: 缩进层级
Returns:
Markdown 无序列表字符串
"""
indent_str = " " * indent
return "\n".join(f"{indent_str}- {item}" for item in items)
@staticmethod
def numbered_list(items: List[str], start: int = 1, indent: int = 0) -> str:
"""
生成有序列表
Args:
items: 列表项
start: 起始编号
indent: 缩进层级
Returns:
Markdown 有序列表字符串
"""
indent_str = " " * indent
return "\n".join(f"{indent_str}{i}. {item}" for i, item in enumerate(items, start=start))
@staticmethod
def quote(text: str, author: Optional[str] = None) -> str:
"""
生成引用块
Args:
text: 引用文本
author: 作者(可选)
Returns:
Markdown 引用块字符串
"""
quoted_lines = "\n".join(f"> {line}" for line in text.split("\n"))
if author:
quoted_lines += f"\n> — {author}"
return quoted_lines
@staticmethod
def code(code: str, language: str = "") -> str:
"""
生成代码块
Args:
code: 代码内容
language: 语言标识符
Returns:
Markdown 代码块字符串
"""
return f"```{language}\n{code}\n```"
@staticmethod
def heading(text: str, level: int = 1) -> str:
"""
生成标题
Args:
text: 标题文本
level: 标题级别1-6
Returns:
Markdown 标题字符串
"""
level = max(1, min(6, level))
return f"{'#' * level} {text}"
@staticmethod
def link(text: str, url: str) -> str:
"""
生成链接
Args:
text: 链接文本
url: 链接地址
Returns:
Markdown 链接字符串
"""
return f"[{text}]({url})"
@staticmethod
def bold(text: str) -> str:
"""生成粗体"""
return f"**{text}**"
@staticmethod
def italic(text: str) -> str:
"""生成斜体"""
return f"*{text}*"
@staticmethod
def divider() -> str:
"""生成分割线"""
return "---"
def format(self, data: Any) -> str:
"""实现基类方法,根据数据类型自动选择格式化方式"""
if isinstance(data, list):
if len(data) > 0 and isinstance(data[0], dict):
return self.table(data)
else:
return self.bullet_list([str(item) for item in data])
elif isinstance(data, dict):
return self.table([data])
else:
return str(data)
@dataclass
class Template:
"""模板数据类"""
name: str
content: str
description: str = ""
class DictLoader(BaseLoader):
"""字典模板加载器"""
用于从内存字典中加载模板
"""
def __init__(self, templates: Dict[str, str]):
self.templates = templates
def get_source(self, environment, template):
if template not in self.templates:
raise TemplateNotFound(template)
source = self.templates[template]
return source, None, lambda: True
class TemplateManager:
"""Jinja2 模板管理器"""
def __init__(self, template_dir: Optional[Path] = None):
"""
初始化模板管理器
Args:
template_dir: 模板目录路径
"""
self._templates: Dict[str, Template] = {}
self.template_dir = template_dir
self._env: Optional[Environment] = None
if HAS_JINJA2:
self._env = Environment(loader=DictLoader({}))
def _refresh_env(self) -> None:
"""刷新 Jinja2 环境"""
if HAS_JINJA2 and self._env is not None:
template_dict = {name: t.content for name, t in self._templates.items()}
self._env = Environment(loader=DictLoader(template_dict))
def add_template(self, name: str, content: str, description: str = "") -> None:
"""
添加模板
Args:
name: 模板名称
content: 模板内容
description: 模板描述
"""
self._templates[name] = Template(name=name, content=content, description=description)
self._refresh_env()
def load_template(self, name: str, file_path: Path) -> None:
"""
从文件加载模板
Args:
name: 模板名称
file_path: 模板文件路径
"""
if file_path.exists():
content = file_path.read_text(encoding='utf-8')
self.add_template(name, content, f"从文件加载: {file_path}")
def get_template(self, name: str) -> Optional[Template]:
"""
获取模板
Args:
name: 模板名称
Returns:
模板对象如果不存在返回 None
"""
return self._templates.get(name)
def render(self, template_name: str, context: Dict[str, Any]) -> str:
"""
渲染模板
Args:
template_name: 模板名称
context: 渲染上下文
Returns:
渲染后的字符串
"""
template = self.get_template(template_name)
if template is None:
raise ValueError(f"模板不存在: {template_name}")
return self.render_string(template.content, context)
def render_string(self, template_string: str, context: Dict[str, Any]) -> str:
"""
渲染模板字符串
Args:
template_string: 模板字符串
context: 渲染上下文
Returns:
渲染后的字符串
"""
if HAS_JINJA2 and self._env is not None:
try:
jinja_template = self._env.from_string(template_string)
return jinja_template.render(**context)
except Exception:
# 如果 Jinja2 渲染失败,使用简单替换
pass
# 简单的字符串替换作为备选方案
result = template_string
for key, value in context.items():
result = result.replace(f"{{{{{key}}}}}", str(value))
result = result.replace(f"{{{{ {key} }}}}", str(value))
return result
class PresetTemplates:
"""预置模板集合"""
@staticmethod
def conversation_summary() -> str:
"""对话摘要模板"""
return """# 对话摘要
**时间**: {{ timestamp }}
**参与者**: {{ participants }}
---
## 对话要点
{{ bullet_list(points) }}
---
## 总结
{{ summary }}
"""
@staticmethod
def research_report() -> str:
"""研究报告模板"""
return """# {{ title }}
**日期**: {{ date }}
**作者**: {{ author }}
---
## 摘要
{{ summary }}
---
## 发现
{{ bullet_list(findings) }}
---
## 数据来源
{{ sources }}
"""
@staticmethod
def task_list() -> str:
"""任务列表模板"""
return """# 任务列表
**更新时间**: {{ update_time }}
---
## 待办
{{ numbered_list(todos) }}
---
## 已完成
{{ numbered_list(completed) }}
"""
@staticmethod
def data_summary() -> str:
"""数据摘要模板"""
return """# 数据摘要
**生成时间**: {{ timestamp }}
---
## 数据概览
{{ table(data_overview) }}
---
## 关键指标
{{ bullet_list(metrics) }}
"""
class OutputRenderer:
"""输出渲染器"""
def __init__(self, template_manager: Optional[TemplateManager] = None):
"""
初始化输出渲染器
Args:
template_manager: 模板管理器
"""
self.template_manager = template_manager or TemplateManager()
self.markdown = MarkdownFormatter()
# 自动注册预置模板
self._register_presets()
def _register_presets(self) -> None:
"""注册预置模板"""
self.template_manager.add_template(
"conversation_summary",
PresetTemplates.conversation_summary(),
"对话摘要模板"
)
self.template_manager.add_template(
"research_report",
PresetTemplates.research_report(),
"研究报告模板"
)
self.template_manager.add_template(
"task_list",
PresetTemplates.task_list(),
"任务列表模板"
)
self.template_manager.add_template(
"data_summary",
PresetTemplates.data_summary(),
"数据摘要模板"
)
def render(self, template_name: str, context: Dict[str, Any]) -> str:
"""
使用模板渲染输出
Args:
template_name: 模板名称
context: 渲染上下文
Returns:
渲染后的字符串
"""
# 将格式化工具注入上下文
render_context = context.copy()
render_context["bullet_list"] = self.markdown.bullet_list
render_context["numbered_list"] = self.markdown.numbered_list
render_context["table"] = self.markdown.table
render_context["quote"] = self.markdown.quote
render_context["code"] = self.markdown.code
render_context["heading"] = self.markdown.heading
render_context["link"] = self.markdown.link
render_context["bold"] = self.markdown.bold
render_context["italic"] = self.markdown.italic
render_context["divider"] = self.markdown.divider
return self.template_manager.render(template_name, render_context)
def render_plain(self, data: Any) -> str:
"""
直接格式化数据为 Markdown
Args:
data: 数据
Returns:
格式化后的字符串
"""
return self.markdown.format(data)

View File

@@ -0,0 +1,465 @@
"""
人工审核工具模块
提供 LangGraph interrupt 机制和状态持久化能力
功能:
1. HumanReview - 人工审核数据类
2. ReviewStatus - 审核状态枚举
3. HumanReviewStore - 审核存储接口
4. InMemoryReviewStore - 内存存储实现
5. HumanReviewNode - LangGraph 审核节点
6. ReviewManager - 审核管理器
"""
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum, auto
from abc import ABC, abstractmethod
from datetime import datetime
import uuid
class ReviewStatus(Enum):
"""审核状态枚举"""
PENDING = auto() # 待审核
APPROVED = auto() # 已通过
REJECTED = auto() # 已拒绝
MODIFIED = auto() # 已修改
TIMEOUT = auto() # 已超时
@dataclass
class HumanReview:
"""人工审核数据类"""
review_id: str # 审核ID
thread_id: str # 线程ID
user_id: str # 用户ID
status: ReviewStatus # 审核状态
content_to_review: str # 待审核内容
review_comment: str = "" # 审核意见
modified_content: str = "" # 修改后的内容
created_at: datetime = field(default_factory=datetime.now)
reviewed_at: Optional[datetime] = None
reviewer: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
class HumanReviewStore(ABC):
"""审核存储接口"""
@abstractmethod
def save(self, review: HumanReview) -> None:
"""
保存审核
Args:
review: 审核对象
"""
pass
@abstractmethod
def get(self, review_id: str) -> Optional[HumanReview]:
"""
获取审核
Args:
review_id: 审核ID
Returns:
审核对象,如果不存在返回 None
"""
pass
@abstractmethod
def get_by_thread(self, thread_id: str) -> List[HumanReview]:
"""
获取线程的所有审核
Args:
thread_id: 线程ID
Returns:
审核列表
"""
pass
@abstractmethod
def get_pending(self, limit: int = 100) -> List[HumanReview]:
"""
获取待审核的列表
Args:
limit: 返回数量限制
Returns:
待审核列表
"""
pass
@abstractmethod
def update_status(
self,
review_id: str,
status: ReviewStatus,
reviewer: Optional[str] = None,
comment: str = "",
modified_content: str = ""
) -> bool:
"""
更新审核状态
Args:
review_id: 审核ID
status: 新状态
reviewer: 审核人
comment: 审核意见
modified_content: 修改后的内容
Returns:
是否成功
"""
pass
class InMemoryReviewStore(HumanReviewStore):
"""内存存储实现"""
def __init__(self):
self._reviews: Dict[str, HumanReview] = {}
def save(self, review: HumanReview) -> None:
"""
保存审核
Args:
review: 审核对象
"""
self._reviews[review.review_id] = review
def get(self, review_id: str) -> Optional[HumanReview]:
"""
获取审核
Args:
review_id: 审核ID
Returns:
审核对象,如果不存在返回 None
"""
return self._reviews.get(review_id)
def get_by_thread(self, thread_id: str) -> List[HumanReview]:
"""
获取线程的所有审核
Args:
thread_id: 线程ID
Returns:
审核列表
"""
return [
review for review in self._reviews.values()
if review.thread_id == thread_id
]
def get_pending(self, limit: int = 100) -> List[HumanReview]:
"""
获取待审核的列表
Args:
limit: 返回数量限制
Returns:
待审核列表
"""
pending = [
review for review in self._reviews.values()
if review.status == ReviewStatus.PENDING
]
pending.sort(key=lambda r: r.created_at)
return pending[:limit]
def update_status(
self,
review_id: str,
status: ReviewStatus,
reviewer: Optional[str] = None,
comment: str = "",
modified_content: str = ""
) -> bool:
"""
更新审核状态
Args:
review_id: 审核ID
status: 新状态
reviewer: 审核人
comment: 审核意见
modified_content: 修改后的内容
Returns:
是否成功
"""
review = self._reviews.get(review_id)
if review is None:
return False
review.status = status
review.review_comment = comment
review.modified_content = modified_content
review.reviewer = reviewer
review.reviewed_at = datetime.now()
return True
class HumanReviewNode:
"""LangGraph 审核节点"""
def __init__(
self,
store: HumanReviewStore,
should_review: Optional[Callable[[Any], bool]] = None
):
"""
初始化审核节点
Args:
store: 审核存储
should_review: 判断是否需要审核的函数
"""
self.store = store
self.should_review = should_review or (lambda state: True)
def create_review(
self,
state: Any,
thread_id: str,
user_id: str,
content_to_review: str
) -> str:
"""
创建审核
Args:
state: 状态
thread_id: 线程ID
user_id: 用户ID
content_to_review: 待审核内容
Returns:
审核ID
"""
review_id = str(uuid.uuid4())
review = HumanReview(
review_id=review_id,
thread_id=thread_id,
user_id=user_id,
status=ReviewStatus.PENDING,
content_to_review=content_to_review
)
self.store.save(review)
return review_id
def check_review_status(self, review_id: str) -> Optional[ReviewStatus]:
"""
检查审核状态
Args:
review_id: 审核ID
Returns:
审核状态,如果不存在返回 None
"""
review = self.store.get(review_id)
return review.status if review else None
def get_review_result(self, review_id: str) -> Optional[HumanReview]:
"""
获取审核结果
Args:
review_id: 审核ID
Returns:
审核对象,如果不存在返回 None
"""
return self.store.get(review_id)
async def __call__(self, state: Any) -> Dict[str, Any]:
"""
节点执行方法LangGraph 兼容)
Args:
state: 状态
Returns:
更新后的状态
"""
# 检查是否需要审核
if not self.should_review(state):
return {"review_skipped": True}
# 从状态中提取信息
thread_id = getattr(state, "thread_id", str(uuid.uuid4()))
user_id = getattr(state, "user_id", "default_user")
# 获取待审核内容
content_to_review = ""
if hasattr(state, "messages") and state.messages:
last_msg = state.messages[-1] if state.messages else None
if last_msg and hasattr(last_msg, "content"):
content_to_review = last_msg.content
# 创建审核
review_id = self.create_review(state, thread_id, user_id, content_to_review)
# 返回状态更新
return {
"review_id": review_id,
"review_pending": True,
"interrupt": True # 标记需要中断
}
class ReviewManager:
"""审核管理器"""
def __init__(self, store: Optional[HumanReviewStore] = None):
"""
初始化审核管理器
Args:
store: 审核存储
"""
self.store = store or InMemoryReviewStore()
def request_review(
self,
thread_id: str,
user_id: str,
content: str,
metadata: Optional[Dict[str, Any]] = None
) -> str:
"""
请求审核
Args:
thread_id: 线程ID
user_id: 用户ID
content: 待审核内容
metadata: 元数据
Returns:
审核ID
"""
review_id = str(uuid.uuid4())
review = HumanReview(
review_id=review_id,
thread_id=thread_id,
user_id=user_id,
status=ReviewStatus.PENDING,
content_to_review=content,
metadata=metadata or {}
)
self.store.save(review)
return review_id
def approve(
self,
review_id: str,
reviewer: str,
comment: str = ""
) -> bool:
"""
审核通过
Args:
review_id: 审核ID
reviewer: 审核人
comment: 审核意见
Returns:
是否成功
"""
return self.store.update_status(
review_id=review_id,
status=ReviewStatus.APPROVED,
reviewer=reviewer,
comment=comment
)
def reject(
self,
review_id: str,
reviewer: str,
comment: str = ""
) -> bool:
"""
审核拒绝
Args:
review_id: 审核ID
reviewer: 审核人
comment: 审核意见
Returns:
是否成功
"""
return self.store.update_status(
review_id=review_id,
status=ReviewStatus.REJECTED,
reviewer=reviewer,
comment=comment
)
def modify(
self,
review_id: str,
reviewer: str,
modified_content: str,
comment: str = ""
) -> bool:
"""
审核修改
Args:
review_id: 审核ID
reviewer: 审核人
modified_content: 修改后的内容
comment: 审核意见
Returns:
是否成功
"""
return self.store.update_status(
review_id=review_id,
status=ReviewStatus.MODIFIED,
reviewer=reviewer,
comment=comment,
modified_content=modified_content
)
def get_pending_reviews(self, limit: int = 100) -> List[HumanReview]:
"""
获取待审核列表
Args:
limit: 返回数量限制
Returns:
待审核列表
"""
return self.store.get_pending(limit)
def get_review(self, review_id: str) -> Optional[HumanReview]:
"""
获取审核详情
Args:
review_id: 审核ID
Returns:
审核对象,如果不存在返回 None
"""
return self.store.get(review_id)

View File

@@ -0,0 +1,427 @@
"""
意图理解工具模块
提供标准化的意图分类和信息提取能力
功能:
1. Intent - 意图数据类
2. IntentClassifier - 意图分类器
3. EntityExtractor - 实体提取器
4. IntentParser - 完整的意图解析器
5. IntentRegistry - 意图注册器
"""
import re
from typing import Dict, List, Any, Optional, Set, Tuple, Callable
from dataclasses import dataclass, field
from enum import Enum, auto
from abc import ABC, abstractmethod
class IntentType(Enum):
"""意图类型枚举"""
UNKNOWN = auto()
GREETING = auto() # 问候
QUESTION = auto() # 提问
REQUEST = auto() # 请求
COMMAND = auto() # 命令
INFORM = auto() # 告知信息
CONFIRM = auto() # 确认
DENY = auto() # 否认
THANKS = auto() # 感谢
GOODBYE = auto() # 告别
COMPLAINT = auto() # 投诉
PRAISE = auto() # 表扬
CLARIFY = auto() # 澄清
SUGGEST = auto() # 建议
@dataclass
class Entity:
"""实体数据类"""
entity_type: str # 实体类型
value: str # 实体值
start_pos: int = 0 # 起始位置
end_pos: int = 0 # 结束位置
confidence: float = 1.0 # 置信度
metadata: Dict[str, Any] = field(default_factory=dict) # 元数据
@dataclass
class Intent:
"""意图数据类"""
intent_type: IntentType # 意图类型
confidence: float = 1.0 # 置信度
entities: List[Entity] = field(default_factory=list) # 提取的实体
parameters: Dict[str, Any] = field(default_factory=dict) # 参数
original_text: str = "" # 原始文本
normalized_text: str = "" # 标准化后的文本
metadata: Dict[str, Any] = field(default_factory=dict) # 元数据
class BaseIntentClassifier(ABC):
"""意图分类器基类"""
@abstractmethod
def classify(self, text: str) -> Tuple[IntentType, float]:
"""
分类意图
Args:
text: 输入文本
Returns:
(意图类型, 置信度)
"""
pass
@abstractmethod
def classify_with_scores(self, text: str) -> Dict[IntentType, float]:
"""
分类意图,返回所有类型的置信度
Args:
text: 输入文本
Returns:
{意图类型: 置信度}
"""
pass
class RuleBasedIntentClassifier(BaseIntentClassifier):
"""基于规则的意图分类器"""
def __init__(self):
self._rules: Dict[IntentType, Set[str]] = {}
self._initialize_default_rules()
def _initialize_default_rules(self) -> None:
"""初始化默认规则"""
# 问候
self.add_rule(IntentType.GREETING, {
"你好", "您好", "hi", "hello", "hey", "早上好", "下午好", "晚上好", "哈喽"
})
# 告别
self.add_rule(IntentType.GOODBYE, {
"再见", "拜拜", "bye", "goodbye", "回见", "下次见", "再见了"
})
# 感谢
self.add_rule(IntentType.THANKS, {
"谢谢", "感谢", "多谢", "thanks", "thank you", "3q", "谢谢了"
})
# 确认
self.add_rule(IntentType.CONFIRM, {
"是的", "", "没错", "好的", "可以", "", "同意", "确认", "yes", "yep"
})
# 否认
self.add_rule(IntentType.DENY, {
"", "不是", "不对", "不行", "不要", "拒绝", "no", "nope", "没有"
})
# 提问
self.add_rule(IntentType.QUESTION, {
"?", "", "什么", "怎么", "如何", "为什么", "", "", "多少", "", ""
})
def add_rule(self, intent_type: IntentType, keywords: Set[str]) -> None:
"""
添加规则
Args:
intent_type: 意图类型
keywords: 关键词集合
"""
if intent_type not in self._rules:
self._rules[intent_type] = set()
self._rules[intent_type].update(keywords)
def classify(self, text: str) -> Tuple[IntentType, float]:
"""
分类意图
Args:
text: 输入文本
Returns:
(意图类型, 置信度)
"""
scores = self.classify_with_scores(text)
if not scores:
return IntentType.UNKNOWN, 0.0
best_intent = max(scores.items(), key=lambda x: x[1])
return best_intent[0], best_intent[1]
def classify_with_scores(self, text: str) -> Dict[IntentType, float]:
"""
分类意图,返回所有类型的置信度
Args:
text: 输入文本
Returns:
{意图类型: 置信度}
"""
scores: Dict[IntentType, float] = {}
normalized_text = text.lower()
for intent_type, keywords in self._rules.items():
match_count = 0
for keyword in keywords:
if keyword.lower() in normalized_text:
match_count += 1
if match_count > 0:
scores[intent_type] = min(1.0, match_count / 3.0)
# 如果没有匹配返回UNKNOWN
if not scores:
scores[IntentType.UNKNOWN] = 0.5
return scores
class BaseEntityExtractor(ABC):
"""实体提取器基类"""
@abstractmethod
def extract(self, text: str) -> List[Entity]:
"""
提取实体
Args:
text: 输入文本
Returns:
实体列表
"""
pass
class RuleBasedEntityExtractor(BaseEntityExtractor):
"""基于规则的实体提取器"""
def __init__(self):
self._patterns: Dict[str, re.Pattern] = {} # 正则模式
self._keywords: Dict[str, Set[str]] = {} # 关键词列表
self._initialize_default_patterns()
def _initialize_default_patterns(self) -> None:
"""初始化默认模式"""
# 邮箱
self.add_regex_pattern(
"email",
r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
)
# 电话号码
self.add_regex_pattern(
"phone",
r'1[3-9]\d{9}'
)
# 日期(简单模式)
self.add_regex_pattern(
"date",
r'\d{4}[-/年]\d{1,2}[-/月]\d{1,2}[日号]?|\d{1,2}[-/月]\d{1,2}[日号]?'
)
# 数字
self.add_regex_pattern(
"number",
r'\d+\.?\d*'
)
def add_regex_pattern(self, entity_type: str, pattern: str) -> None:
"""
添加正则匹配规则
Args:
entity_type: 实体类型
pattern: 正则表达式
"""
try:
self._patterns[entity_type] = re.compile(pattern, re.IGNORECASE)
except re.error:
pass
def add_keywords(self, entity_type: str, keywords: Set[str]) -> None:
"""
添加关键词匹配规则
Args:
entity_type: 实体类型
keywords: 关键词集合
"""
if entity_type not in self._keywords:
self._keywords[entity_type] = set()
self._keywords[entity_type].update(keywords)
def extract(self, text: str) -> List[Entity]:
"""
提取实体
Args:
text: 输入文本
Returns:
实体列表
"""
entities: List[Entity] = []
# 正则匹配
for entity_type, pattern in self._patterns.items():
for match in pattern.finditer(text):
entity = Entity(
entity_type=entity_type,
value=match.group(),
start_pos=match.start(),
end_pos=match.end(),
confidence=0.95
)
entities.append(entity)
# 关键词匹配
for entity_type, keywords in self._keywords.items():
for keyword in keywords:
start_idx = 0
while True:
pos = text.lower().find(keyword.lower(), start_idx)
if pos == -1:
break
entity = Entity(
entity_type=entity_type,
value=text[pos:pos + len(keyword)],
start_pos=pos,
end_pos=pos + len(keyword),
confidence=0.9
)
entities.append(entity)
start_idx = pos + len(keyword)
# 按位置排序
entities.sort(key=lambda e: e.start_pos)
return entities
class IntentRegistry:
"""意图注册器"""
def __init__(self):
self._intent_handlers: Dict[IntentType, Callable] = {}
def register(self, intent_type: IntentType, handler: Callable) -> None:
"""
注册意图处理器
Args:
intent_type: 意图类型
handler: 处理器
"""
self._intent_handlers[intent_type] = handler
def get_handler(self, intent_type: IntentType) -> Optional[Callable]:
"""
获取意图处理器
Args:
intent_type: 意图类型
Returns:
处理器,如果不存在返回 None
"""
return self._intent_handlers.get(intent_type)
class IntentParser:
"""完整的意图解析器"""
def __init__(
self,
classifier: Optional[BaseIntentClassifier] = None,
extractor: Optional[BaseEntityExtractor] = None,
registry: Optional[IntentRegistry] = None
):
"""
初始化意图解析器
Args:
classifier: 意图分类器
extractor: 实体提取器
registry: 意图注册器
"""
self.classifier = classifier or RuleBasedIntentClassifier()
self.extractor = extractor or RuleBasedEntityExtractor()
self.registry = registry or IntentRegistry()
def parse(self, text: str) -> Intent:
"""
解析文本,返回完整的意图对象
Args:
text: 输入文本
Returns:
意图对象
"""
# 分类意图
intent_type, confidence = self.classifier.classify(text)
# 提取实体
entities = self.extractor.extract(text)
# 构建意图对象
intent = Intent(
intent_type=intent_type,
confidence=confidence,
entities=entities,
original_text=text,
normalized_text=text.lower().strip()
)
# 从实体中提取参数
for entity in entities:
intent.parameters[entity.entity_type] = entity.value
return intent
def parse_and_execute(self, text: str, context: Optional[Dict[str, Any]] = None) -> Any:
"""
解析文本并执行对应的处理器
Args:
text: 输入文本
context: 上下文
Returns:
执行结果
"""
intent = self.parse(text)
handler = self.registry.get_handler(intent.intent_type)
if handler:
return handler(intent, context or {})
return None
def create_default_intent_parser() -> IntentParser:
"""
创建默认配置的意图解析器
Returns:
配置好的意图解析器
"""
parser = IntentParser()
# 注册默认处理器
def greeting_handler(intent: Intent, context: Dict) -> str:
return "你好!很高兴为你服务。"
def thanks_handler(intent: Intent, context: Dict) -> str:
return "不客气!"
def goodbye_handler(intent: Intent, context: Dict) -> str:
return "再见!有需要随时找我。"
parser.registry.register(IntentType.GREETING, greeting_handler)
parser.registry.register(IntentType.THANKS, thanks_handler)
parser.registry.register(IntentType.GOODBYE, goodbye_handler)
return parser

View File

@@ -0,0 +1,10 @@
"""
状态基类工具模块
提供类型安全的 LangGraph 状态基类和常用状态操作工具
功能:
1. BaseState - 基础状态基类包含通用字段消息、token统计、耗时等
2. StateUtils - 状态操作工具类,提供常用的状态访问和修改方法
3. TypedStateBuilder - 类型化状态构建器,支持链式创建自定义状态
4. StateValidation - 状态验证工具,确保状态完整性
"""