"""统计上报路由""" from datetime import datetime, timedelta from typing import Optional from fastapi import APIRouter, Depends, Header, HTTPException, Query from sqlalchemy.orm import Session from sqlalchemy import func from ..database import get_db from ..config import get_settings from ..models.stats import AICallEvent from ..schemas.stats import AICallEventCreate, AICallEventResponse, BatchReportRequest from ..services.auth import decode_token router = APIRouter(prefix="/stats", tags=["stats"]) settings = get_settings() def get_current_user_optional(authorization: Optional[str] = Header(None)): """可选的用户认证""" if authorization and authorization.startswith("Bearer "): token = authorization[7:] return decode_token(token) return None def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")): """验证API Key""" if x_api_key != settings.API_KEY: raise HTTPException(status_code=401, detail="Invalid API Key") return x_api_key @router.post("/report", response_model=AICallEventResponse) async def report_ai_call( event: AICallEventCreate, db: Session = Depends(get_db), _: str = Depends(verify_api_key) ): """上报AI调用事件""" db_event = AICallEvent(**event.model_dump()) db.add(db_event) db.commit() db.refresh(db_event) return db_event @router.post("/report/batch") async def batch_report_ai_calls( request: BatchReportRequest, db: Session = Depends(get_db), _: str = Depends(verify_api_key) ): """批量上报AI调用事件""" events = [AICallEvent(**e.model_dump()) for e in request.events] db.add_all(events) db.commit() return {"success": True, "count": len(events)} @router.get("/summary") async def get_stats_summary( db: Session = Depends(get_db), user = Depends(get_current_user_optional) ): """获取统计摘要(用于仪表盘)""" today = datetime.now().date() # 今日调用次数和 token 消耗 today_stats = db.query( func.count(AICallEvent.id).label('calls'), func.coalesce(func.sum(AICallEvent.input_tokens + AICallEvent.output_tokens), 0).label('tokens') ).filter( func.date(AICallEvent.created_at) == today ).first() # 本周数据 week_start = today - timedelta(days=today.weekday()) week_stats = db.query( func.count(AICallEvent.id).label('calls'), func.coalesce(func.sum(AICallEvent.input_tokens + AICallEvent.output_tokens), 0).label('tokens') ).filter( func.date(AICallEvent.created_at) >= week_start ).first() return { "today_calls": today_stats.calls if today_stats else 0, "today_tokens": int(today_stats.tokens) if today_stats else 0, "week_calls": week_stats.calls if week_stats else 0, "week_tokens": int(week_stats.tokens) if week_stats else 0 } @router.get("/trend") async def get_stats_trend( days: int = Query(7, ge=1, le=30), tenant_id: Optional[str] = None, db: Session = Depends(get_db), user = Depends(get_current_user_optional) ): """获取调用趋势数据""" end_date = datetime.now().date() start_date = end_date - timedelta(days=days-1) query = db.query( func.date(AICallEvent.created_at).label('date'), func.count(AICallEvent.id).label('calls'), func.coalesce(func.sum(AICallEvent.input_tokens + AICallEvent.output_tokens), 0).label('tokens') ).filter( func.date(AICallEvent.created_at) >= start_date, func.date(AICallEvent.created_at) <= end_date ) if tenant_id: query = query.filter(AICallEvent.tenant_id == tenant_id) results = query.group_by(func.date(AICallEvent.created_at)).all() # 转换为字典便于查找 data_map = {str(r.date): {"calls": r.calls, "tokens": int(r.tokens)} for r in results} # 填充所有日期 trend = [] current = start_date while current <= end_date: date_str = str(current) trend.append({ "date": date_str, "calls": data_map.get(date_str, {}).get("calls", 0), "tokens": data_map.get(date_str, {}).get("tokens", 0) }) current += timedelta(days=1) return {"trend": trend}