Files
ailine/backend/app/db/init_db.py

104 lines
3.0 KiB
Python
Raw Permalink Normal View History

"""
子图数据库表初始化
FastAPI 启动时创建表
"""
from typing import Optional
async def init_subgraph_tables(conn):
"""
初始化子图所需的表
Args:
conn: 数据库连接来自 AsyncPostgresSaver
"""
# 1. contacts 表(通讯录)
await _create_contacts_table(conn)
# 2. words 表(词典)
await _create_words_table(conn)
# 3. news 表(资讯)
await _create_news_table(conn)
async def _create_contacts_table(conn):
"""创建 contacts 表"""
sql = """
CREATE TABLE IF NOT EXISTS contacts (
id VARCHAR(64) PRIMARY KEY,
user_id VARCHAR(64) NOT NULL,
name VARCHAR(100) NOT NULL,
phone VARCHAR(32),
email VARCHAR(100),
company VARCHAR(100),
position VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
try:
async with conn.cursor() as cur:
await cur.execute(sql)
await _create_index_if_not_exists(conn, "idx_contacts_user", "contacts", "user_id")
except Exception as e:
print(f"contacts 表可能已存在: {e}")
async def _create_words_table(conn):
"""创建 words 表"""
sql = """
CREATE TABLE IF NOT EXISTS words (
id VARCHAR(64) PRIMARY KEY,
user_id VARCHAR(64) NOT NULL,
word VARCHAR(100) NOT NULL,
phonetic VARCHAR(100),
part_of_speech VARCHAR(50),
definition TEXT,
examples TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
try:
async with conn.cursor() as cur:
await cur.execute(sql)
await _create_index_if_not_exists(conn, "idx_words_user", "words", "user_id")
await _create_index_if_not_exists(conn, "idx_words_word", "words", "word")
except Exception as e:
print(f"words 表可能已存在: {e}")
async def _create_news_table(conn):
"""创建 news 表"""
sql = """
CREATE TABLE IF NOT EXISTS news (
id VARCHAR(64) PRIMARY KEY,
user_id VARCHAR(64) NOT NULL,
title VARCHAR(200) NOT NULL,
content TEXT,
url VARCHAR(500),
source VARCHAR(100),
keywords TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
try:
async with conn.cursor() as cur:
await cur.execute(sql)
await _create_index_if_not_exists(conn, "idx_news_user", "news", "user_id")
except Exception as e:
print(f"news 表可能已存在: {e}")
async def _create_index_if_not_exists(conn, index_name: str, table_name: str, column_name: str):
"""创建索引(如果不存在)"""
sql = f"CREATE INDEX IF NOT EXISTS {index_name} ON {table_name} ({column_name});"
try:
async with conn.cursor() as cur:
await cur.execute(sql)
except Exception as e:
print(f"索引 {index_name} 可能已存在: {e}")