Files
011-ai-interview/deploy/upload_backend.py
2026-01-23 13:57:48 +08:00

213 lines
6.5 KiB
Python

#!/usr/bin/env python3
"""
上传后端代码到服务器并重新构建
"""
import requests
import time
import hashlib
import os
# 禁用 SSL 警告
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 宝塔配置
BT_PANEL = "http://47.107.172.23:8888"
BT_API_KEY = "PKdfnaInQL0P5ghB8SvwbrGcIpXWaEvq"
DEPLOY_PATH = "/www/wwwroot/ai-interview"
LOCAL_BACKEND = "/Users/a111/Documents/AgentWD/projects/011-ai-interview-2601/backend"
def bt_api(action, data=None, timeout=60):
"""调用宝塔 API"""
if data is None:
data = {}
request_time = int(time.time())
request_token = hashlib.md5(
f"{request_time}{hashlib.md5(BT_API_KEY.encode()).hexdigest()}".encode()
).hexdigest()
data['request_time'] = request_time
data['request_token'] = request_token
url = f"{BT_PANEL}/{action}"
try:
response = requests.post(url, data=data, timeout=timeout, verify=False)
try:
return response.json()
except:
return {"status": True, "msg": response.text[:500]}
except Exception as e:
return {"status": False, "msg": str(e)}
def write_remote_file(path, content):
"""写入远程文件"""
print(f"📝 写入: {path}")
result = bt_api("files?action=SaveFileBody", {
"path": path,
"data": content,
"encoding": "utf-8"
})
if result.get("status"):
print(" ✅ 成功")
return True
print(f" ❌ 失败: {result.get('msg', result)}")
return False
def create_directory(path):
"""创建目录"""
result = bt_api("files?action=CreateDir", {"path": path})
return result.get("status") or "已存在" in str(result.get("msg", ""))
def run_task(shell_body, task_name, wait_time=30):
"""创建计划任务并执行"""
result = bt_api("crontab?action=AddCrontab", {
"name": task_name,
"type": "minute-n",
"where1": "1",
"hour": "",
"minute": "",
"week": "",
"sType": "toShell",
"sBody": shell_body,
"sName": "",
"backupTo": "",
"save": "",
"urladdress": ""
})
if not result.get("status") or not result.get("id"):
print(f"创建任务失败: {result}")
return None
cron_id = result["id"]
bt_api("crontab?action=StartTask", {"id": cron_id}, timeout=300)
print(f" 任务 {cron_id} 已启动,等待 {wait_time} 秒...")
time.sleep(wait_time)
log_result = bt_api("crontab?action=GetLogs", {"id": cron_id})
bt_api("crontab?action=DelCrontab", {"id": cron_id})
return log_result
def main():
print("=" * 60)
print("📤 上传后端代码到服务器")
print("=" * 60)
print()
# 1. 创建目录结构
print("📁 步骤 1: 创建目录结构")
dirs = [
f"{DEPLOY_PATH}/backend",
f"{DEPLOY_PATH}/backend/app",
f"{DEPLOY_PATH}/backend/app/routers",
f"{DEPLOY_PATH}/backend/app/services",
]
for d in dirs:
create_directory(d)
print(f"{d}")
# 2. 上传后端代码
print("\n📤 步骤 2: 上传后端代码")
# 定义要上传的文件
files_to_upload = [
("main.py", ""),
("requirements.txt", ""),
("app/__init__.py", "app"),
("app/config.py", "app"),
("app/schemas.py", "app"),
("app/routers/__init__.py", "app/routers"),
("app/routers/admin.py", "app/routers"),
("app/routers/candidate.py", "app/routers"),
("app/routers/chat.py", "app/routers"),
("app/routers/files.py", "app/routers"),
("app/routers/init.py", "app/routers"),
("app/routers/room.py", "app/routers"),
("app/routers/upload.py", "app/routers"),
("app/services/__init__.py", "app/services"),
("app/services/coze_service.py", "app/services"),
]
for local_file, subdir in files_to_upload:
local_path = os.path.join(LOCAL_BACKEND, local_file)
if os.path.exists(local_path):
with open(local_path, 'r', encoding='utf-8') as f:
content = f.read()
if subdir:
remote_path = f"{DEPLOY_PATH}/backend/{subdir}/{os.path.basename(local_file)}"
else:
remote_path = f"{DEPLOY_PATH}/backend/{local_file}"
write_remote_file(remote_path, content)
else:
print(f" ⚠️ 文件不存在: {local_path}")
# 3. 上传 deploy 配置文件
print("\n📤 步骤 3: 上传部署配置")
deploy_files = [
"docker-compose.yml",
"Dockerfile.backend",
"Dockerfile.frontend",
]
for filename in deploy_files:
local_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), filename)
if os.path.exists(local_path):
with open(local_path, 'r', encoding='utf-8') as f:
content = f.read()
write_remote_file(f"{DEPLOY_PATH}/deploy/{filename}", content)
# 上传 nginx 配置
nginx_conf_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "nginx", "frontend.conf")
if os.path.exists(nginx_conf_path):
create_directory(f"{DEPLOY_PATH}/deploy/nginx")
with open(nginx_conf_path, 'r', encoding='utf-8') as f:
content = f.read()
write_remote_file(f"{DEPLOY_PATH}/deploy/nginx/frontend.conf", content)
# 4. 重新构建 Docker 容器
print("\n🐳 步骤 4: 重新构建 Docker 容器")
rebuild_script = f"""#!/bin/bash
cd {DEPLOY_PATH}/deploy
echo "停止旧容器..."
docker-compose down 2>/dev/null || true
sleep 2
echo "清理旧镜像..."
docker rmi deploy-backend 2>/dev/null || true
echo "重新构建..."
docker-compose up -d --build
sleep 10
echo "容器状态:"
docker ps --format 'table {{{{.Names}}}}\\t{{{{.Status}}}}\\t{{{{.Ports}}}}'
echo ""
echo "后端日志:"
docker logs --tail 20 ai-interview-backend 2>&1
"""
print(" 🚀 执行重建命令...")
result = run_task(rebuild_script, f"rebuild_docker_{int(time.time())}", wait_time=60)
if result and result.get("msg"):
print("\n📋 执行结果:")
print("-" * 40)
print(result["msg"][:2000])
print()
print("=" * 60)
print("✅ 上传完成!")
print("=" * 60)
if __name__ == "__main__":
main()