""" 嵌入模型包装器 - 直接使用统一嵌入服务 支持自动降级(本地 llama.cpp → 智谱),由 get_embedding_service() 内部处理 """ import sys import logging from typing import List from pathlib import Path # 添加父目录到路径,支持从 app.model_services 导入 backend_root = Path(__file__).parent.parent if str(backend_root) not in sys.path: sys.path.insert(0, str(backend_root)) from .config import LLAMACPP_EMBEDDING_URL, LLAMACPP_API_KEY from langchain_core.embeddings import Embeddings logger = logging.getLogger(__name__) class LlamaCppEmbedder: """ 嵌入器包装类 - 直接使用统一的 get_embedding_service() 降级逻辑完全由 app.model_services 处理 """ def __init__(self, model: str = "Qwen3-Embedding-0.6B-Q8_0", use_fallback: bool = True): """ Args: model: 嵌入模型名称(向后兼容,现在实际使用统一服务) use_fallback: 是否使用降级机制(保留参数,现在始终为 True) """ self.model = model self._fallback_embeddings = None # 直接获取统一嵌入服务 try: from app.model_services import get_embedding_service self._fallback_embeddings = get_embedding_service() logger.info("✅ 统一嵌入服务加载成功") except Exception as e: logger.warning(f"⚠️ 无法加载统一嵌入服务: {e}") # 保留向后兼容的初始化 self.base_url = LLAMACPP_EMBEDDING_URL self.api_key = LLAMACPP_API_KEY def as_langchain_embeddings(self) -> Embeddings: """创建 LangChain 兼容的嵌入实例""" if self._fallback_embeddings: logger.info("✅ 使用统一嵌入服务(已内置降级机制)") return self._fallback_embeddings # 向后兼容,仅在统一服务不可用时使用传统方式 logger.warning("⚠️ 统一服务不可用,使用传统模式(不推荐)") return _LlamaCppLangchainAdapter(self) def embed_documents(self, texts: List[str]) -> List[List[float]]: """嵌入一批文档""" if self._fallback_embeddings: return self._fallback_embeddings.embed_documents(texts) # 向后兼容 return self._call_embedding_api(texts) def embed_query(self, text: str) -> List[float]: """嵌入单个查询""" if self._fallback_embeddings: return self._fallback_embeddings.embed_query(text) # 向后兼容 return self._call_embedding_api([text])[0] def get_embedding_dimension(self) -> int: """通过嵌入测试字符串获取嵌入维度""" test_embedding = self.embed_query("test") return len(test_embedding) def _call_embedding_api(self, texts: List[str]) -> List[List[float]]: """仅作为向后兼容的备用方法""" import httpx if not hasattr(self, 'base_url') or not self.base_url: raise ValueError("LLAMACPP_EMBEDDING_URL 未配置且统一服务不可用") headers = {"Content-Type": "application/json"} if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" base = self.base_url.rstrip("/") if not base.endswith("/v1"): base = base + "/v1" payload = { "input": texts, "model": self.model, } with httpx.Client(timeout=120) as client: response = client.post( f"{base}/embeddings", headers=headers, json=payload, ) response.raise_for_status() data = response.json() if isinstance(data, list): return [item["embedding"] for item in data] elif isinstance(data, dict) and "data" in data: return [item["embedding"] for item in sorted(data["data"], key=lambda x: x["index"])] else: raise ValueError(f"未知的嵌入 API 响应格式: {data}") class _LlamaCppLangchainAdapter(Embeddings): """仅作为向后兼容的适配器""" def __init__(self, embedder: LlamaCppEmbedder): self._embedder = embedder def embed_documents(self, texts: List[str]) -> List[List[float]]: return self._embedder.embed_documents(texts) def embed_query(self, text: str) -> List[float]: return self._embedder.embed_query(text)