"""应用管理路由""" import json import hmac import hashlib import time from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel from typing import Optional, List from sqlalchemy.orm import Session from ..database import get_db from ..models.app import App from ..models.tenant_app import TenantApp from .auth import get_current_user, require_operator from ..models.user import User router = APIRouter(prefix="/apps", tags=["应用管理"]) # ============ Schemas ============ class ToolItem(BaseModel): """工具项""" code: str name: str path: str class AppCreate(BaseModel): """创建应用""" app_code: str app_name: str base_url: Optional[str] = None description: Optional[str] = None tools: Optional[List[ToolItem]] = None class AppUpdate(BaseModel): """更新应用""" app_name: Optional[str] = None base_url: Optional[str] = None description: Optional[str] = None tools: Optional[List[ToolItem]] = None status: Optional[int] = None class GenerateUrlRequest(BaseModel): """生成链接请求""" tenant_id: str app_code: str tool_code: Optional[str] = None # 不传则生成应用首页链接 # ============ API Endpoints ============ @router.get("") async def list_apps( page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), status: Optional[int] = None, user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """获取应用列表""" query = db.query(App) if status is not None: query = query.filter(App.status == status) total = query.count() apps = query.order_by(App.id.asc()).offset((page - 1) * size).limit(size).all() return { "total": total, "page": page, "size": size, "items": [format_app(app) for app in apps] } @router.get("/all") async def list_all_apps( user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """获取所有启用的应用(用于下拉选择)""" apps = db.query(App).filter(App.status == 1).order_by(App.id.asc()).all() return [{"app_code": app.app_code, "app_name": app.app_name} for app in apps] @router.get("/{app_id}") async def get_app( app_id: int, user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """获取应用详情""" app = db.query(App).filter(App.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用不存在") return format_app(app) @router.post("") async def create_app( data: AppCreate, user: User = Depends(require_operator), db: Session = Depends(get_db) ): """创建应用""" # 检查 app_code 是否重复 exists = db.query(App).filter(App.app_code == data.app_code).first() if exists: raise HTTPException(status_code=400, detail="应用代码已存在") app = App( app_code=data.app_code, app_name=data.app_name, base_url=data.base_url, description=data.description, tools=json.dumps([t.model_dump() for t in data.tools], ensure_ascii=False) if data.tools else None, status=1 ) db.add(app) db.commit() db.refresh(app) return {"success": True, "id": app.id} @router.put("/{app_id}") async def update_app( app_id: int, data: AppUpdate, user: User = Depends(require_operator), db: Session = Depends(get_db) ): """更新应用""" app = db.query(App).filter(App.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用不存在") update_data = data.model_dump(exclude_unset=True) # 处理 tools JSON if 'tools' in update_data: if update_data['tools']: update_data['tools'] = json.dumps([t.model_dump() if hasattr(t, 'model_dump') else t for t in update_data['tools']], ensure_ascii=False) else: update_data['tools'] = None for key, value in update_data.items(): setattr(app, key, value) db.commit() return {"success": True} @router.delete("/{app_id}") async def delete_app( app_id: int, user: User = Depends(require_operator), db: Session = Depends(get_db) ): """删除应用""" app = db.query(App).filter(App.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用不存在") # 检查是否有租户在使用 tenant_count = db.query(TenantApp).filter(TenantApp.app_code == app.app_code).count() if tenant_count > 0: raise HTTPException(status_code=400, detail=f"有 {tenant_count} 个租户正在使用此应用,无法删除") db.delete(app) db.commit() return {"success": True} @router.post("/generate-url") async def generate_signed_url( data: GenerateUrlRequest, user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ 生成带签名的访问链接 返回完整的可直接使用的 URL """ # 获取应用信息 app = db.query(App).filter(App.app_code == data.app_code, App.status == 1).first() if not app: raise HTTPException(status_code=404, detail="应用不存在或已禁用") if not app.base_url: raise HTTPException(status_code=400, detail="应用未配置基础URL") # 获取租户配置 tenant_app = db.query(TenantApp).filter( TenantApp.tenant_id == data.tenant_id, TenantApp.app_code == data.app_code, TenantApp.status == 1 ).first() if not tenant_app: raise HTTPException(status_code=404, detail="租户未配置此应用") # 构建基础 URL base_url = app.base_url.rstrip('/') if data.tool_code: # 查找工具路径 tools = json.loads(app.tools) if app.tools else [] tool = next((t for t in tools if t.get('code') == data.tool_code), None) if tool: base_url = f"{base_url}{tool.get('path', '')}" else: base_url = f"{base_url}/{data.tool_code}" # 构建参数 params = { "tid": data.tenant_id, "aid": data.app_code } # 如果需要签名 if tenant_app.token_required and tenant_app.token_secret: ts = str(int(time.time())) message = f"{data.tenant_id}{data.app_code}{ts}" sign = hmac.new( tenant_app.token_secret.encode(), message.encode(), hashlib.sha256 ).hexdigest() params["ts"] = ts params["sign"] = sign # 组装 URL query_string = "&".join([f"{k}={v}" for k, v in params.items()]) full_url = f"{base_url}?{query_string}" return { "success": True, "url": full_url, "params": params, "token_required": bool(tenant_app.token_required), "expires_in": 300 if tenant_app.token_required else None, # 签名5分钟有效 "note": "签名链接5分钟内有效,过期需重新生成" if tenant_app.token_required else "免签名链接,长期有效" } @router.get("/{app_code}/tools") async def get_app_tools( app_code: str, user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """获取应用的工具列表(用于配置权限时选择)""" app = db.query(App).filter(App.app_code == app_code).first() if not app: raise HTTPException(status_code=404, detail="应用不存在") tools = json.loads(app.tools) if app.tools else [] return tools def format_app(app: App) -> dict: """格式化应用数据""" return { "id": app.id, "app_code": app.app_code, "app_name": app.app_name, "base_url": app.base_url, "description": app.description, "tools": json.loads(app.tools) if app.tools else [], "status": app.status, "created_at": app.created_at, "updated_at": app.updated_at }