"""认证服务""" import bcrypt from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from pydantic import BaseModel from sqlalchemy.orm import Session from ..config import get_settings from ..models.user import User class TokenData(BaseModel): """Token 数据""" user_id: int username: str role: str class UserInfo(BaseModel): """用户信息""" id: int username: str nickname: Optional[str] role: str def verify_password(plain_password: str, hashed_password: str) -> bool: """验证密码""" return bcrypt.checkpw( plain_password.encode('utf-8'), hashed_password.encode('utf-8') ) def hash_password(password: str) -> str: """哈希密码""" return bcrypt.hashpw( password.encode('utf-8'), bcrypt.gensalt() ).decode('utf-8') def create_access_token(data: dict, expires_delta: timedelta = None) -> str: """创建 JWT Token""" settings = get_settings() to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(hours=settings.JWT_EXPIRE_HOURS) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) def decode_token(token: str) -> Optional[TokenData]: """解析 JWT Token""" settings = get_settings() try: payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) return TokenData( user_id=payload.get("user_id"), username=payload.get("username"), role=payload.get("role") ) except JWTError: return None def authenticate_user(db: Session, username: str, password: str) -> Optional[User]: """认证用户""" user = db.query(User).filter(User.username == username).first() if not user: return None if not verify_password(password, user.password_hash): return None if user.status != 1: return None return user def update_last_login(db: Session, user_id: int): """更新最后登录时间""" db.query(User).filter(User.id == user_id).update( {"last_login_at": datetime.now()} ) db.commit()