feat: 静态 Token 鉴权改造
All checks were successful
continuous-integration/drone/push Build is passing

- 将 token_secret 改为 access_token(长期有效)
- 移除 token_required 字段,统一使用 token 验证
- 生成链接简化为 ?tid=xxx&token=xxx 格式
- 前端移除签名验证开关,链接永久有效
This commit is contained in:
111
2026-01-23 18:43:04 +08:00
parent 39f33d7ac5
commit f815b29c51
4 changed files with 759 additions and 791 deletions

View File

@@ -19,8 +19,7 @@ class TenantApp(Base):
wechat_secret_encrypted = Column(Text)
# 鉴权配置
token_secret = Column(String(64))
token_required = Column(SmallInteger, default=0)
access_token = Column(String(64)) # 访问令牌(长期有效)
allowed_origins = Column(Text) # JSON 数组
# 功能权限

View File

@@ -1,281 +1,267 @@
"""应用管理路由"""
import json
import hmac
import hashlib
import time
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from typing import Optional, List
from sqlalchemy.orm import Session
from ..database import get_db
from ..models.app import App
from ..models.tenant_app import TenantApp
from .auth import get_current_user, require_operator
from ..models.user import User
router = APIRouter(prefix="/apps", tags=["应用管理"])
# ============ Schemas ============
class ToolItem(BaseModel):
"""工具项"""
code: str
name: str
path: str
class AppCreate(BaseModel):
"""创建应用"""
app_code: str
app_name: str
base_url: Optional[str] = None
description: Optional[str] = None
tools: Optional[List[ToolItem]] = None
class AppUpdate(BaseModel):
"""更新应用"""
app_name: Optional[str] = None
base_url: Optional[str] = None
description: Optional[str] = None
tools: Optional[List[ToolItem]] = None
status: Optional[int] = None
class GenerateUrlRequest(BaseModel):
"""生成链接请求"""
tenant_id: str
app_code: str
tool_code: Optional[str] = None # 不传则生成应用首页链接
# ============ API Endpoints ============
@router.get("")
async def list_apps(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
status: Optional[int] = None,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取应用列表"""
query = db.query(App)
if status is not None:
query = query.filter(App.status == status)
total = query.count()
apps = query.order_by(App.id.asc()).offset((page - 1) * size).limit(size).all()
return {
"total": total,
"page": page,
"size": size,
"items": [format_app(app) for app in apps]
}
@router.get("/all")
async def list_all_apps(
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取所有启用的应用(用于下拉选择)"""
apps = db.query(App).filter(App.status == 1).order_by(App.id.asc()).all()
return [{"app_code": app.app_code, "app_name": app.app_name} for app in apps]
@router.get("/{app_id}")
async def get_app(
app_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取应用详情"""
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用不存在")
return format_app(app)
@router.post("")
async def create_app(
data: AppCreate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""创建应用"""
# 检查 app_code 是否重复
exists = db.query(App).filter(App.app_code == data.app_code).first()
if exists:
raise HTTPException(status_code=400, detail="应用代码已存在")
app = App(
app_code=data.app_code,
app_name=data.app_name,
base_url=data.base_url,
description=data.description,
tools=json.dumps([t.model_dump() for t in data.tools], ensure_ascii=False) if data.tools else None,
status=1
)
db.add(app)
db.commit()
db.refresh(app)
return {"success": True, "id": app.id}
@router.put("/{app_id}")
async def update_app(
app_id: int,
data: AppUpdate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""更新应用"""
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用不存在")
update_data = data.model_dump(exclude_unset=True)
# 处理 tools JSON
if 'tools' in update_data:
if update_data['tools']:
update_data['tools'] = json.dumps([t.model_dump() if hasattr(t, 'model_dump') else t for t in update_data['tools']], ensure_ascii=False)
else:
update_data['tools'] = None
for key, value in update_data.items():
setattr(app, key, value)
db.commit()
return {"success": True}
@router.delete("/{app_id}")
async def delete_app(
app_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""删除应用"""
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用不存在")
# 检查是否有租户在使用
tenant_count = db.query(TenantApp).filter(TenantApp.app_code == app.app_code).count()
if tenant_count > 0:
raise HTTPException(status_code=400, detail=f"{tenant_count} 个租户正在使用此应用,无法删除")
db.delete(app)
db.commit()
return {"success": True}
@router.post("/generate-url")
async def generate_signed_url(
data: GenerateUrlRequest,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
生成带签名的访问链接
返回完整的可直接使用的 URL
"""
# 获取应用信息
app = db.query(App).filter(App.app_code == data.app_code, App.status == 1).first()
if not app:
raise HTTPException(status_code=404, detail="应用不存在或已禁用")
if not app.base_url:
raise HTTPException(status_code=400, detail="应用未配置基础URL")
# 获取租户配置
tenant_app = db.query(TenantApp).filter(
TenantApp.tenant_id == data.tenant_id,
TenantApp.app_code == data.app_code,
TenantApp.status == 1
).first()
if not tenant_app:
raise HTTPException(status_code=404, detail="租户未配置此应用")
# 构建基础 URL
base_url = app.base_url.rstrip('/')
if data.tool_code:
# 查找工具路径
tools = json.loads(app.tools) if app.tools else []
tool = next((t for t in tools if t.get('code') == data.tool_code), None)
if tool:
base_url = f"{base_url}{tool.get('path', '')}"
else:
base_url = f"{base_url}/{data.tool_code}"
# 构建参数
params = {
"tid": data.tenant_id,
"aid": data.app_code
}
# 如果需要签名
if tenant_app.token_required and tenant_app.token_secret:
ts = str(int(time.time()))
message = f"{data.tenant_id}{data.app_code}{ts}"
sign = hmac.new(
tenant_app.token_secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
params["ts"] = ts
params["sign"] = sign
# 组装 URL
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
full_url = f"{base_url}?{query_string}"
return {
"success": True,
"url": full_url,
"params": params,
"token_required": bool(tenant_app.token_required),
"expires_in": 300 if tenant_app.token_required else None, # 签名5分钟有效
"note": "签名链接5分钟内有效过期需重新生成" if tenant_app.token_required else "免签名链接,长期有效"
}
@router.get("/{app_code}/tools")
async def get_app_tools(
app_code: str,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取应用的工具列表(用于配置权限时选择)"""
app = db.query(App).filter(App.app_code == app_code).first()
if not app:
raise HTTPException(status_code=404, detail="应用不存在")
tools = json.loads(app.tools) if app.tools else []
return tools
def format_app(app: App) -> dict:
"""格式化应用数据"""
return {
"id": app.id,
"app_code": app.app_code,
"app_name": app.app_name,
"base_url": app.base_url,
"description": app.description,
"tools": json.loads(app.tools) if app.tools else [],
"status": app.status,
"created_at": app.created_at,
"updated_at": app.updated_at
}
"""应用管理路由"""
import json
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from typing import Optional, List
from sqlalchemy.orm import Session
from ..database import get_db
from ..models.app import App
from ..models.tenant_app import TenantApp
from .auth import get_current_user, require_operator
from ..models.user import User
router = APIRouter(prefix="/apps", tags=["应用管理"])
# ============ Schemas ============
class ToolItem(BaseModel):
"""工具项"""
code: str
name: str
path: str
class AppCreate(BaseModel):
"""创建应用"""
app_code: str
app_name: str
base_url: Optional[str] = None
description: Optional[str] = None
tools: Optional[List[ToolItem]] = None
class AppUpdate(BaseModel):
"""更新应用"""
app_name: Optional[str] = None
base_url: Optional[str] = None
description: Optional[str] = None
tools: Optional[List[ToolItem]] = None
status: Optional[int] = None
class GenerateUrlRequest(BaseModel):
"""生成链接请求"""
tenant_id: str
app_code: str
tool_code: Optional[str] = None # 不传则生成应用首页链接
# ============ API Endpoints ============
@router.get("")
async def list_apps(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
status: Optional[int] = None,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取应用列表"""
query = db.query(App)
if status is not None:
query = query.filter(App.status == status)
total = query.count()
apps = query.order_by(App.id.asc()).offset((page - 1) * size).limit(size).all()
return {
"total": total,
"page": page,
"size": size,
"items": [format_app(app) for app in apps]
}
@router.get("/all")
async def list_all_apps(
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取所有启用的应用(用于下拉选择)"""
apps = db.query(App).filter(App.status == 1).order_by(App.id.asc()).all()
return [{"app_code": app.app_code, "app_name": app.app_name} for app in apps]
@router.get("/{app_id}")
async def get_app(
app_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取应用详情"""
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用不存在")
return format_app(app)
@router.post("")
async def create_app(
data: AppCreate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""创建应用"""
# 检查 app_code 是否重复
exists = db.query(App).filter(App.app_code == data.app_code).first()
if exists:
raise HTTPException(status_code=400, detail="应用代码已存在")
app = App(
app_code=data.app_code,
app_name=data.app_name,
base_url=data.base_url,
description=data.description,
tools=json.dumps([t.model_dump() for t in data.tools], ensure_ascii=False) if data.tools else None,
status=1
)
db.add(app)
db.commit()
db.refresh(app)
return {"success": True, "id": app.id}
@router.put("/{app_id}")
async def update_app(
app_id: int,
data: AppUpdate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""更新应用"""
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用不存在")
update_data = data.model_dump(exclude_unset=True)
# 处理 tools JSON
if 'tools' in update_data:
if update_data['tools']:
update_data['tools'] = json.dumps([t.model_dump() if hasattr(t, 'model_dump') else t for t in update_data['tools']], ensure_ascii=False)
else:
update_data['tools'] = None
for key, value in update_data.items():
setattr(app, key, value)
db.commit()
return {"success": True}
@router.delete("/{app_id}")
async def delete_app(
app_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""删除应用"""
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用不存在")
# 检查是否有租户在使用
tenant_count = db.query(TenantApp).filter(TenantApp.app_code == app.app_code).count()
if tenant_count > 0:
raise HTTPException(status_code=400, detail=f"{tenant_count}租户在使用此应用,无法删除")
db.delete(app)
db.commit()
return {"success": True}
@router.post("/generate-url")
async def generate_url(
data: GenerateUrlRequest,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
生成访问链接
返回完整的可直接使用的 URL使用静态 token长期有效
"""
# 获取应用信息
app = db.query(App).filter(App.app_code == data.app_code, App.status == 1).first()
if not app:
raise HTTPException(status_code=404, detail="应用不存在或已禁用")
if not app.base_url:
raise HTTPException(status_code=400, detail="应用未配置基础URL")
# 获取租户配置
tenant_app = db.query(TenantApp).filter(
TenantApp.tenant_id == data.tenant_id,
TenantApp.app_code == data.app_code,
TenantApp.status == 1
).first()
if not tenant_app:
raise HTTPException(status_code=404, detail="租户未配置此应用")
if not tenant_app.access_token:
raise HTTPException(status_code=400, detail="租户应用未配置访问令牌")
# 构建基础 URL
base_url = app.base_url.rstrip('/')
if data.tool_code:
# 查找工具路径
tools = json.loads(app.tools) if app.tools else []
tool = next((t for t in tools if t.get('code') == data.tool_code), None)
if tool:
base_url = f"{base_url}{tool.get('path', '')}"
else:
base_url = f"{base_url}/{data.tool_code}"
# 构建参数(静态 token长期有效
params = {
"tid": data.tenant_id,
"token": tenant_app.access_token
}
# 组装 URL
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
full_url = f"{base_url}?{query_string}"
return {
"success": True,
"url": full_url,
"params": params,
"note": "静态链接,长期有效"
}
@router.get("/{app_code}/tools")
async def get_app_tools(
app_code: str,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取应用的工具列表(用于配置权限时选择)"""
app = db.query(App).filter(App.app_code == app_code).first()
if not app:
raise HTTPException(status_code=404, detail="应用不存在")
tools = json.loads(app.tools) if app.tools else []
return tools
def format_app(app: App) -> dict:
"""格式化应用数据"""
return {
"id": app.id,
"app_code": app.app_code,
"app_name": app.app_name,
"base_url": app.base_url,
"description": app.description,
"tools": json.loads(app.tools) if app.tools else [],
"status": app.status,
"created_at": app.created_at,
"updated_at": app.updated_at
}

View File

@@ -24,8 +24,7 @@ class TenantAppCreate(BaseModel):
wechat_corp_id: Optional[str] = None
wechat_agent_id: Optional[str] = None
wechat_secret: Optional[str] = None # 明文,存储时加密
token_secret: Optional[str] = None # 如果不传则自动生成
token_required: bool = False
access_token: Optional[str] = None # 如果不传则自动生成
allowed_origins: Optional[List[str]] = None
allowed_tools: Optional[List[str]] = None
@@ -35,8 +34,7 @@ class TenantAppUpdate(BaseModel):
wechat_corp_id: Optional[str] = None
wechat_agent_id: Optional[str] = None
wechat_secret: Optional[str] = None
token_secret: Optional[str] = None
token_required: Optional[bool] = None
access_token: Optional[str] = None
allowed_origins: Optional[List[str]] = None
allowed_tools: Optional[List[str]] = None
status: Optional[int] = None
@@ -101,8 +99,8 @@ async def create_tenant_app(
if exists:
raise HTTPException(status_code=400, detail="该租户应用配置已存在")
# 自动生成 token_secret
token_secret = data.token_secret or secrets.token_hex(32)
# 自动生成 access_token
access_token = data.access_token or secrets.token_hex(32)
# 加密 wechat_secret
wechat_secret_encrypted = None
@@ -116,8 +114,7 @@ async def create_tenant_app(
wechat_corp_id=data.wechat_corp_id,
wechat_agent_id=data.wechat_agent_id,
wechat_secret_encrypted=wechat_secret_encrypted,
token_secret=token_secret,
token_required=1 if data.token_required else 0,
access_token=access_token,
allowed_origins=json.dumps(data.allowed_origins) if data.allowed_origins else None,
allowed_tools=json.dumps(data.allowed_tools) if data.allowed_tools else None,
status=1
@@ -126,7 +123,7 @@ async def create_tenant_app(
db.commit()
db.refresh(app)
return {"success": True, "id": app.id, "token_secret": token_secret}
return {"success": True, "id": app.id, "access_token": access_token}
@router.put("/{app_id}")
@@ -155,10 +152,6 @@ async def update_tenant_app(
if 'allowed_tools' in update_data:
update_data['allowed_tools'] = json.dumps(update_data['allowed_tools']) if update_data['allowed_tools'] else None
# 处理 token_required
if 'token_required' in update_data:
update_data['token_required'] = 1 if update_data['token_required'] else 0
for key, value in update_data.items():
setattr(app, key, value)
@@ -189,16 +182,16 @@ async def regenerate_token(
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""重新生成 token_secret"""
"""重新生成 access_token"""
app = db.query(TenantApp).filter(TenantApp.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用配置不存在")
new_token = secrets.token_hex(32)
app.token_secret = new_token
app.access_token = new_token
db.commit()
return {"success": True, "token_secret": new_token}
return {"success": True, "access_token": new_token}
@router.get("/{app_id}/wechat-secret")
@@ -229,8 +222,7 @@ def format_tenant_app(app: TenantApp, mask_secret: bool = True) -> dict:
"wechat_corp_id": app.wechat_corp_id,
"wechat_agent_id": app.wechat_agent_id,
"has_wechat_secret": bool(app.wechat_secret_encrypted),
"token_secret": "******" if mask_secret and app.token_secret else app.token_secret,
"token_required": bool(app.token_required),
"access_token": "******" if mask_secret and app.access_token else app.access_token,
"allowed_origins": json.loads(app.allowed_origins) if app.allowed_origins else [],
"allowed_tools": json.loads(app.allowed_tools) if app.allowed_tools else [],
"status": app.status,