feat: add admin UI frontend and complete backend APIs
Some checks failed
continuous-integration/drone/push Build is failing

- Add Vue 3 frontend with Element Plus
- Implement login, dashboard, tenant management
- Add app configuration, logs viewer, stats pages
- Add user management for admins
- Update Drone CI to build and deploy frontend
- Frontend ports: 3001 (test), 4001 (prod)
This commit is contained in:
111
2026-01-23 15:48:50 +08:00
parent daa8125c58
commit b89d5ddee9
31 changed files with 3115 additions and 8 deletions

View File

@@ -0,0 +1,89 @@
"""认证服务"""
import bcrypt
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from pydantic import BaseModel
from sqlalchemy.orm import Session
from ..config import get_settings
from ..models.user import User
class TokenData(BaseModel):
"""Token 数据"""
user_id: int
username: str
role: str
class UserInfo(BaseModel):
"""用户信息"""
id: int
username: str
nickname: Optional[str]
role: str
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return bcrypt.checkpw(
plain_password.encode('utf-8'),
hashed_password.encode('utf-8')
)
def hash_password(password: str) -> str:
"""哈希密码"""
return bcrypt.hashpw(
password.encode('utf-8'),
bcrypt.gensalt()
).decode('utf-8')
def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
"""创建 JWT Token"""
settings = get_settings()
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=settings.JWT_EXPIRE_HOURS)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
def decode_token(token: str) -> Optional[TokenData]:
"""解析 JWT Token"""
settings = get_settings()
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
return TokenData(
user_id=payload.get("user_id"),
username=payload.get("username"),
role=payload.get("role")
)
except JWTError:
return None
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
"""认证用户"""
user = db.query(User).filter(User.username == username).first()
if not user:
return None
if not verify_password(password, user.password_hash):
return None
if user.status != 1:
return None
return user
def update_last_login(db: Session, user_id: int):
"""更新最后登录时间"""
db.query(User).filter(User.id == user_id).update(
{"last_login_at": datetime.now()}
)
db.commit()