#!/usr/bin/env python3 """ 清理宝塔旧任务后再构建 """ import requests import time import hashlib import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) BT_PANEL = "http://47.107.172.23:8888" BT_API_KEY = "PKdfnaInQL0P5ghB8SvwbrGcIpXWaEvq" DEPLOY_PATH = "/www/wwwroot/ai-interview" def bt_api(action, data=None, timeout=60): if data is None: data = {} request_time = int(time.time()) request_token = hashlib.md5( f"{request_time}{hashlib.md5(BT_API_KEY.encode()).hexdigest()}".encode() ).hexdigest() data['request_time'] = request_time data['request_token'] = request_token url = f"{BT_PANEL}/{action}" try: response = requests.post(url, data=data, timeout=timeout, verify=False) try: return response.json() except: return {"status": True, "msg": response.text[:2000]} except Exception as e: return {"status": False, "msg": str(e)} def cleanup_old_tasks(): """清理所有旧的计划任务""" print("🧹 清理旧计划任务...") result = bt_api("crontab?action=GetCrontab", {"page": 1, "limit": 100}) deleted = 0 if isinstance(result, list): for task in result: if isinstance(task, dict) and task.get("id"): name = task.get("name", "") # 删除我们创建的临时任务 if any(x in name for x in ["upload_", "build_", "cleanup", "restart", "check", "create", "update", "rebuild"]): bt_api("crontab?action=DelCrontab", {"id": task["id"]}) deleted += 1 print(f" 删除: {name}") elif isinstance(result, dict) and result.get("data"): for task in result["data"]: if isinstance(task, dict) and task.get("id"): name = task.get("name", "") if any(x in name for x in ["upload_", "build_", "cleanup", "restart", "check", "create", "update", "rebuild"]): bt_api("crontab?action=DelCrontab", {"id": task["id"]}) deleted += 1 print(f" 删除: {name}") print(f" 已删除 {deleted} 个旧任务") return deleted def run_task(shell_body, task_name, wait_time=30): result = bt_api("crontab?action=AddCrontab", { "name": task_name, "type": "minute-n", "where1": "1", "sType": "toShell", "sBody": shell_body, }) if not result.get("status") or not result.get("id"): print(f" 创建任务失败: {result}") return None cron_id = result["id"] print(f" 任务 {cron_id} 已创建") bt_api("crontab?action=StartTask", {"id": cron_id}, timeout=600) print(f" 执行中,等待 {wait_time}s...") time.sleep(wait_time) log_result = bt_api("crontab?action=GetLogs", {"id": cron_id}) bt_api("crontab?action=DelCrontab", {"id": cron_id}) return log_result def main(): print("=" * 60) print("🚀 清理并构建") print("=" * 60) # 1. 清理旧任务 cleanup_old_tasks() time.sleep(2) # 2. 构建 print("\n🐳 构建 Docker 容器...") build_script = f"""#!/bin/bash cd {DEPLOY_PATH}/deploy # 清理空间 docker system prune -af 2>/dev/null || true # 构建 docker-compose up -d --build 2>&1 sleep 15 echo "容器状态:" docker ps -a --format 'table {{{{.Names}}}}\\t{{{{.Status}}}}\\t{{{{.Ports}}}}' echo "" echo "后端日志:" docker logs --tail 20 ai-interview-backend 2>&1 echo "" echo "测试:" curl -s http://127.0.0.1:8000/health || echo "后端未响应" curl -s -o /dev/null -w " 前端:%{{http_code}}" http://127.0.0.1:3000 """ result = run_task(build_script, "deploy", wait_time=180) if result and result.get("msg"): print("\n📋 结果:") print("-" * 60) print(result["msg"]) print("\n" + "=" * 60) print("🌐 http://interview.test.ai.ireborn.com.cn") if __name__ == "__main__": main()