Files
000-platform/backend/app/routers/stats.py
111 b018844078
All checks were successful
continuous-integration/drone/push Build is passing
fix: add GET endpoints for stats summary and logs query
2026-01-23 16:12:18 +08:00

134 lines
4.3 KiB
Python

"""统计上报路由"""
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, Header, HTTPException, Query
from sqlalchemy.orm import Session
from sqlalchemy import func
from ..database import get_db
from ..config import get_settings
from ..models.stats import AICallEvent
from ..schemas.stats import AICallEventCreate, AICallEventResponse, BatchReportRequest
from ..services.auth import decode_token
router = APIRouter(prefix="/stats", tags=["stats"])
settings = get_settings()
def get_current_user_optional(authorization: Optional[str] = Header(None)):
"""可选的用户认证"""
if authorization and authorization.startswith("Bearer "):
token = authorization[7:]
return decode_token(token)
return None
def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")):
"""验证API Key"""
if x_api_key != settings.API_KEY:
raise HTTPException(status_code=401, detail="Invalid API Key")
return x_api_key
@router.post("/report", response_model=AICallEventResponse)
async def report_ai_call(
event: AICallEventCreate,
db: Session = Depends(get_db),
_: str = Depends(verify_api_key)
):
"""上报AI调用事件"""
db_event = AICallEvent(**event.model_dump())
db.add(db_event)
db.commit()
db.refresh(db_event)
return db_event
@router.post("/report/batch")
async def batch_report_ai_calls(
request: BatchReportRequest,
db: Session = Depends(get_db),
_: str = Depends(verify_api_key)
):
"""批量上报AI调用事件"""
events = [AICallEvent(**e.model_dump()) for e in request.events]
db.add_all(events)
db.commit()
return {"success": True, "count": len(events)}
@router.get("/summary")
async def get_stats_summary(
db: Session = Depends(get_db),
user = Depends(get_current_user_optional)
):
"""获取统计摘要(用于仪表盘)"""
today = datetime.now().date()
# 今日调用次数和 token 消耗
today_stats = db.query(
func.count(AICallEvent.id).label('calls'),
func.coalesce(func.sum(AICallEvent.total_tokens), 0).label('tokens')
).filter(
func.date(AICallEvent.created_at) == today
).first()
# 本周数据
week_start = today - timedelta(days=today.weekday())
week_stats = db.query(
func.count(AICallEvent.id).label('calls'),
func.coalesce(func.sum(AICallEvent.total_tokens), 0).label('tokens')
).filter(
func.date(AICallEvent.created_at) >= week_start
).first()
return {
"today_calls": today_stats.calls if today_stats else 0,
"today_tokens": int(today_stats.tokens) if today_stats else 0,
"week_calls": week_stats.calls if week_stats else 0,
"week_tokens": int(week_stats.tokens) if week_stats else 0
}
@router.get("/trend")
async def get_stats_trend(
days: int = Query(7, ge=1, le=30),
tenant_id: Optional[str] = None,
db: Session = Depends(get_db),
user = Depends(get_current_user_optional)
):
"""获取调用趋势数据"""
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days-1)
query = db.query(
func.date(AICallEvent.created_at).label('date'),
func.count(AICallEvent.id).label('calls'),
func.coalesce(func.sum(AICallEvent.total_tokens), 0).label('tokens')
).filter(
func.date(AICallEvent.created_at) >= start_date,
func.date(AICallEvent.created_at) <= end_date
)
if tenant_id:
query = query.filter(AICallEvent.tenant_id == tenant_id)
results = query.group_by(func.date(AICallEvent.created_at)).all()
# 转换为字典便于查找
data_map = {str(r.date): {"calls": r.calls, "tokens": int(r.tokens)} for r in results}
# 填充所有日期
trend = []
current = start_date
while current <= end_date:
date_str = str(current)
trend.append({
"date": date_str,
"calls": data_map.get(date_str, {}).get("calls", 0),
"tokens": data_map.get(date_str, {}).get("tokens", 0)
})
current += timedelta(days=1)
return {"trend": trend}