#!/usr/bin/env python3 """ 通过宝塔计划任务创建文件 """ import requests import time import hashlib import os import base64 # 禁用 SSL 警告 import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 宝塔配置 BT_PANEL = "http://47.107.172.23:8888" BT_API_KEY = "PKdfnaInQL0P5ghB8SvwbrGcIpXWaEvq" DEPLOY_PATH = "/www/wwwroot/ai-interview" LOCAL_BACKEND = "/Users/a111/Documents/AgentWD/projects/011-ai-interview-2601/backend" LOCAL_DEPLOY = "/Users/a111/Documents/AgentWD/projects/011-ai-interview-2601/deploy" LOCAL_FRONTEND = "/Users/a111/Documents/AgentWD/projects/011-ai-interview-2601/frontend" def bt_api(action, data=None, timeout=60): """调用宝塔 API""" if data is None: data = {} request_time = int(time.time()) request_token = hashlib.md5( f"{request_time}{hashlib.md5(BT_API_KEY.encode()).hexdigest()}".encode() ).hexdigest() data['request_time'] = request_time data['request_token'] = request_token url = f"{BT_PANEL}/{action}" try: response = requests.post(url, data=data, timeout=timeout, verify=False) try: return response.json() except: return {"status": True, "msg": response.text[:1000]} except Exception as e: return {"status": False, "msg": str(e)} def run_task(shell_body, task_name, wait_time=30): """创建计划任务并执行""" result = bt_api("crontab?action=AddCrontab", { "name": task_name, "type": "minute-n", "where1": "1", "hour": "", "minute": "", "week": "", "sType": "toShell", "sBody": shell_body, "sName": "", "backupTo": "", "save": "", "urladdress": "" }) if not result.get("status") or not result.get("id"): print(f" 创建任务失败: {result}") return None cron_id = result["id"] bt_api("crontab?action=StartTask", {"id": cron_id}, timeout=300) print(f" 任务 {cron_id} 已启动,等待 {wait_time} 秒...") time.sleep(wait_time) log_result = bt_api("crontab?action=GetLogs", {"id": cron_id}) bt_api("crontab?action=DelCrontab", {"id": cron_id}) return log_result def create_file_via_shell(remote_path, content): """通过 shell 命令创建文件""" # 使用 base64 编码内容以避免特殊字符问题 encoded = base64.b64encode(content.encode('utf-8')).decode('utf-8') # 确保目录存在,然后写入文件 dir_path = os.path.dirname(remote_path) shell_script = f"""#!/bin/bash mkdir -p {dir_path} echo '{encoded}' | base64 -d > {remote_path} echo "文件已创建: {remote_path}" ls -la {remote_path} """ return shell_script def main(): print("=" * 60) print("📤 创建所有必需文件") print("=" * 60) print() # 读取所有需要上传的文件 files_to_create = [] # 1. requirements.txt with open(os.path.join(LOCAL_BACKEND, "requirements.txt"), 'r') as f: files_to_create.append((f"{DEPLOY_PATH}/backend/requirements.txt", f.read())) # 2. Dockerfile.frontend with open(os.path.join(LOCAL_DEPLOY, "Dockerfile.frontend"), 'r') as f: files_to_create.append((f"{DEPLOY_PATH}/deploy/Dockerfile.frontend", f.read())) # 批量创建文件 print("📁 创建目录结构和文件...") # 合并所有文件创建命令 all_commands = f"""#!/bin/bash mkdir -p {DEPLOY_PATH}/backend mkdir -p {DEPLOY_PATH}/backend/app/routers mkdir -p {DEPLOY_PATH}/backend/app/services mkdir -p {DEPLOY_PATH}/deploy/nginx mkdir -p {DEPLOY_PATH}/frontend mkdir -p {DEPLOY_PATH}/deploy/uploads """ for remote_path, content in files_to_create: encoded = base64.b64encode(content.encode('utf-8')).decode('utf-8') all_commands += f""" echo "创建: {remote_path}" echo '{encoded}' | base64 -d > {remote_path} """ all_commands += f""" echo "" echo "文件列表:" ls -la {DEPLOY_PATH}/backend/ ls -la {DEPLOY_PATH}/deploy/ """ result = run_task(all_commands, f"create_files_{int(time.time())}", wait_time=15) if result and result.get("msg"): print("\n📋 执行结果:") print(result["msg"][:1500]) # 3. 重新构建 Docker print("\n🐳 重新构建 Docker 容器...") rebuild_script = f"""#!/bin/bash cd {DEPLOY_PATH}/deploy echo "停止旧容器..." docker-compose down 2>/dev/null || true sleep 2 echo "重新构建..." docker-compose up -d --build 2>&1 sleep 15 echo "" echo "容器状态:" docker ps --format 'table {{{{.Names}}}}\\t{{{{.Status}}}}\\t{{{{.Ports}}}}' echo "" echo "后端容器日志:" docker logs --tail 30 ai-interview-backend 2>&1 echo "" echo "测试后端:" curl -s http://127.0.0.1:8000/health 2>&1 || echo "后端未响应" """ result = run_task(rebuild_script, f"rebuild_{int(time.time())}", wait_time=90) if result and result.get("msg"): print("\n📋 构建结果:") print("-" * 60) print(result["msg"][:3000]) print() print("=" * 60) print("✅ 完成!") print("=" * 60) if __name__ == "__main__": main()