Files
011-ai-interview/backend/app/routers/admin.py
2026-01-23 13:57:48 +08:00

240 lines
7.0 KiB
Python

"""
后台管理 API
"""
from fastapi import APIRouter, HTTPException, Depends
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from pydantic import BaseModel
from typing import Optional, List, Any
from loguru import logger
import httpx
import json
import secrets
from app.config import settings
router = APIRouter(prefix="/api/admin", tags=["admin"])
security = HTTPBasic()
# 管理员凭证
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "admin"
# Coze 配置
COZE_PAT_TOKEN = settings.COZE_PAT_TOKEN
WORKFLOW_QUERY_ID = "7597376294612107318"
def verify_admin(credentials: HTTPBasicCredentials = Depends(security)):
"""验证管理员凭证"""
is_username_correct = secrets.compare_digest(credentials.username, ADMIN_USERNAME)
is_password_correct = secrets.compare_digest(credentials.password, ADMIN_PASSWORD)
if not (is_username_correct and is_password_correct):
raise HTTPException(
status_code=401,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
return credentials.username
class LoginRequest(BaseModel):
"""登录请求"""
username: str
password: str
class ApiResponse(BaseModel):
"""API 响应"""
code: int = 0
message: str = "success"
data: Any = None
@router.post("/login", response_model=ApiResponse)
async def login(request: LoginRequest):
"""
管理员登录
"""
if request.username == ADMIN_USERNAME and request.password == ADMIN_PASSWORD:
return ApiResponse(data={"token": "admin_token", "username": ADMIN_USERNAME})
raise HTTPException(status_code=401, detail="用户名或密码错误")
async def execute_workflow(table: str, sql: str) -> dict:
"""
执行 Coze 工作流(通用 SQL 查询)
Args:
table: 表名 - assessments / logs / config
sql: SQL 语句
"""
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {COZE_PAT_TOKEN}",
"Content-Type": "application/json"
}
# 构建 JSON 格式的 input
input_data = json.dumps({
"table": table,
"sql": sql
}, ensure_ascii=False)
payload = {
"workflow_id": WORKFLOW_QUERY_ID,
"parameters": {
"input": input_data
}
}
logger.info(f"Execute workflow: table={table}, sql={sql[:80]}...")
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
logger.info(f"Workflow response code: {data.get('code')}")
if data.get("code") == 0:
# 解析返回数据
result_str = data.get("data", "")
if result_str:
try:
parsed = json.loads(result_str)
# 工作流返回格式: {"output": [...]}
if isinstance(parsed, dict) and "output" in parsed:
return parsed["output"]
return parsed
except:
return {"raw": result_str}
return []
else:
error_msg = data.get("msg", "Unknown error")
logger.error(f"Workflow error: {error_msg}")
raise HTTPException(status_code=500, detail=error_msg)
# ==================== 面试评估 ====================
@router.get("/interviews", response_model=ApiResponse)
async def get_interviews(
page: int = 1,
page_size: int = 20,
session_id: Optional[str] = None
):
"""
获取面试列表
"""
offset = (page - 1) * page_size
if session_id:
sql = f"""
SELECT session_id, candidate_name, bstudio_create_time,
sales_skill_score, sales_concept_score, competency_score,
final_score_report, current_stage
FROM ci_interview_assessments
WHERE session_id = '{session_id}'
"""
else:
sql = f"""
SELECT session_id, candidate_name, bstudio_create_time,
sales_skill_score, sales_concept_score, competency_score,
final_score_report, current_stage
FROM ci_interview_assessments
ORDER BY bstudio_create_time DESC
LIMIT {page_size} OFFSET {offset}
"""
result = await execute_workflow("assessments", sql)
return ApiResponse(data=result)
@router.get("/interviews/{session_id}", response_model=ApiResponse)
async def get_interview_detail(session_id: str):
"""
获取面试详情(完整评估报告)
"""
sql = f"SELECT * FROM ci_interview_assessments WHERE session_id = '{session_id}'"
result = await execute_workflow("assessments", sql)
# 返回第一条记录
if isinstance(result, list) and len(result) > 0:
return ApiResponse(data=result[0])
return ApiResponse(data=result)
@router.get("/interviews/{session_id}/logs", response_model=ApiResponse)
async def get_interview_logs(session_id: str):
"""
获取面试对话记录
"""
sql = f"""
SELECT log_id, session_id, stage, round, ai_question, user_answer, log_type, bstudio_create_time
FROM ci_interview_logs
WHERE session_id = '{session_id}'
ORDER BY bstudio_create_time ASC
"""
result = await execute_workflow("logs", sql)
return ApiResponse(data=result)
@router.delete("/interviews/{session_id}", response_model=ApiResponse)
async def delete_interview(session_id: str):
"""
删除面试记录(同时删除评估和日志)
"""
# 删除评估
sql1 = f"DELETE FROM ci_interview_assessments WHERE session_id = '{session_id}'"
await execute_workflow("assessments", sql1)
# 删除日志
sql2 = f"DELETE FROM ci_interview_logs WHERE session_id = '{session_id}'"
await execute_workflow("logs", sql2)
return ApiResponse(message="删除成功")
# ==================== 业务配置 ====================
@router.get("/configs", response_model=ApiResponse)
async def get_configs(config_type: Optional[str] = None):
"""
获取业务配置列表
"""
if config_type:
sql = f"""
SELECT config_id, config_type, item_name, content, bstudio_create_time
FROM ci_business_config
WHERE config_type = '{config_type}'
ORDER BY bstudio_create_time DESC
"""
else:
sql = """
SELECT config_id, config_type, item_name, content, bstudio_create_time
FROM ci_business_config
ORDER BY config_type, bstudio_create_time DESC
"""
result = await execute_workflow("config", sql)
return ApiResponse(data=result)
# ==================== 统计 ====================
@router.get("/stats", response_model=ApiResponse)
async def get_stats():
"""
获取统计数据
"""
# 总面试数
sql_total = "SELECT COUNT(*) as total FROM ci_interview_assessments"
total_result = await execute_workflow("assessments", sql_total)
return ApiResponse(data={
"total_interviews": total_result
})