Files
011-ai-interview/backend/app/services/coze_service.py
2026-01-23 13:57:48 +08:00

840 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Coze API 服务封装
"""
import time
import httpx
from typing import Optional, Dict, Any
from loguru import logger
from app.config import settings
class CozeService:
"""Coze API 服务"""
def __init__(self):
self.base_url = settings.COZE_API_BASE
self.headers = {
"Authorization": f"Bearer {settings.COZE_PAT_TOKEN}",
"Content-Type": "application/json",
}
self.bot_id = settings.COZE_BOT_ID
# 工作流 A 的 ID初始化工作流
INIT_WORKFLOW_ID = "7597357422713798710"
# 工作流 B 的 ID面试工作流
INTERVIEW_WORKFLOW_ID = "7595077233002840079"
async def upload_file(self, file_content: bytes, filename: str) -> Dict[str, Any]:
"""
上传文件到 Coze
Args:
file_content: 文件内容
filename: 文件名
Returns:
{"id": "file_xxx", ...}
"""
url = f"{self.base_url}/v1/files/upload"
async with httpx.AsyncClient(timeout=60.0) as client:
files = {"file": (filename, file_content)}
headers = {"Authorization": f"Bearer {settings.COZE_PAT_TOKEN}"}
response = await client.post(url, files=files, headers=headers)
response.raise_for_status()
data = response.json()
logger.info(f"File uploaded: {data}")
if "data" in data:
return data["data"]
return data
async def get_file_url(self, file_id: str) -> str:
"""
获取 Coze 文件的临时下载链接
Args:
file_id: 文件 ID
Returns:
文件的临时下载 URL
"""
url = f"{self.base_url}/v1/files/retrieve"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
url,
params={"file_id": file_id},
headers=self.headers
)
response.raise_for_status()
data = response.json()
logger.info(f"File retrieve response: {data}")
# 返回文件的临时 URL
file_info = data.get("data", {})
file_url = file_info.get("url", "")
if not file_url:
raise ValueError(f"Failed to get file URL for file_id: {file_id}")
return file_url
async def run_init_workflow(
self,
name: str,
file_url: str,
) -> Dict[str, Any]:
"""
执行初始化工作流工作流A
- 上传简历、解析、创建数据库记录
- 返回 session_id 和调试信息
Args:
name: 候选人姓名
file_url: 简历文件链接
Returns:
{"session_id": "xxx", "raw_response": {...}, "parsed_data": {...}, "debug_url": "..."}
"""
import asyncio
url = f"{self.base_url}/v1/workflow/run"
payload = {
"workflow_id": self.INIT_WORKFLOW_ID,
"parameters": {
"name": name,
"file_url": file_url,
}
}
logger.info(f"Running init workflow with payload: {payload}")
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(url, json=payload, headers=self.headers)
response.raise_for_status()
data = response.json()
logger.info(f"Init workflow response: {data}")
result = {
"session_id": "",
"raw_response": data,
"parsed_data": None,
"debug_url": data.get("debug_url", ""),
"execute_id": data.get("execute_id", ""),
"code": data.get("code"),
"msg": data.get("msg", ""),
}
if data.get("code") == 0:
import json
# execute_id 在顶层
execute_id = data.get("execute_id", "")
# data 字段是工作流输出JSON 字符串)
output_str = data.get("data", "")
result["execute_id"] = execute_id
result["output_str"] = output_str
logger.info(f"Workflow execute_id: {execute_id}")
logger.info(f"Workflow output_str: {output_str}")
# 构建调试链接
result["debug_url"] = f"https://www.coze.cn/work_flow?execute_id={execute_id}&space_id=7516832346776780836&workflow_id={self.INIT_WORKFLOW_ID}&execute_mode=2"
# 解析工作流输出
session_id = None
output_data = None
if output_str:
try:
output_data = json.loads(output_str)
result["parsed_data"] = output_data
logger.info(f"Workflow output_data: {output_data}")
# 尝试从不同格式中提取 session_id
if isinstance(output_data, dict):
# 格式1: {"session_id": "xxx"}
session_id = output_data.get("session_id")
# 格式2: {"data": "SESS_xxx"} - data 直接是 session_id 字符串
if not session_id and "data" in output_data:
inner_data = output_data.get("data")
# 如果 data 是字符串且以 SESS_ 开头,直接使用
if isinstance(inner_data, str) and inner_data.startswith("SESS_"):
session_id = inner_data
elif isinstance(inner_data, str):
# 尝试解析为 JSON
try:
inner_data = json.loads(inner_data)
except:
pass
if isinstance(inner_data, dict):
session_id = inner_data.get("session_id")
elif isinstance(inner_data, list) and len(inner_data) > 0:
session_id = inner_data[0].get("session_id") if isinstance(inner_data[0], dict) else None
elif isinstance(output_data, list) and len(output_data) > 0:
# 格式3: [{"session_id": "xxx"}]
if isinstance(output_data[0], dict):
session_id = output_data[0].get("session_id")
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse workflow output: {e}")
result["parse_error"] = str(e)
# 如果没有 session_id使用 execute_id 作为替代
if not session_id:
logger.warning(f"No session_id in workflow output, using execute_id as session_id")
session_id = f"WF_{execute_id}" if execute_id else f"SESS_{int(time.time())}"
result["session_id_source"] = "execute_id_fallback"
else:
result["session_id_source"] = "workflow_output"
result["session_id"] = session_id
logger.info(f"Final session_id: {session_id}")
return result
else:
error_msg = data.get("msg", "Unknown error")
result["error"] = error_msg
raise ValueError(f"Workflow execution failed: {error_msg}")
async def _wait_for_workflow_result(
self,
execute_id: str,
max_retries: int = 60,
) -> str:
"""
等待工作流执行完成并获取结果
"""
import asyncio
url = f"{self.base_url}/v1/workflow/run_histories"
for i in range(max_retries):
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
url,
params={"execute_id": execute_id},
headers=self.headers
)
if response.status_code == 200:
data = response.json()
logger.info(f"Workflow status [{i}]: {data}")
if data.get("code") == 0:
result_data = data.get("data", {})
status = result_data.get("status", "")
if status == "Success":
output = result_data.get("output", "")
# 解析输出获取 session_id
try:
import json
output_data = json.loads(output)
return output_data.get("session_id", output)
except:
return output
elif status == "Failed":
raise ValueError(f"Workflow failed: {result_data.get('error', 'Unknown')}")
# Running 状态继续等待
await asyncio.sleep(1)
raise TimeoutError("Workflow execution timeout")
async def init_interview(
self,
name: str,
file_content: bytes,
filename: str,
) -> Dict[str, Any]:
"""
完整的面试初始化流程
1. 上传文件到远程服务器
2. 获取公网可访问的 URL
3. 执行初始化工作流
4. 返回 session_id 和调试信息
Args:
name: 候选人姓名
file_content: 简历文件内容
filename: 文件名
Returns:
{"session_id": "xxx", "debug_info": {...}}
"""
debug_info = {
"steps": [],
"timestamps": {},
}
# 1. 上传文件到远程服务器
debug_info["steps"].append("Step 1: Uploading file to remote server")
debug_info["file_size"] = len(file_content)
logger.info(f"Step 1: Uploading file to remote server ({len(file_content)} bytes)...")
# 检查配置
if not settings.FILE_SERVER_TOKEN:
raise ValueError("FILE_SERVER_TOKEN is not configured. Please set FILE_SERVER_TOKEN in .env file.")
# 调用远程 PHP 上传接口
upload_url = settings.FILE_SERVER_UPLOAD_URL
async with httpx.AsyncClient(timeout=60.0) as client:
files = {"file": (filename, file_content, "application/pdf")}
data = {"token": settings.FILE_SERVER_TOKEN}
response = await client.post(upload_url, files=files, data=data)
response.raise_for_status()
upload_result = response.json()
logger.info(f"Upload response: {upload_result}")
if upload_result.get("code") != 0:
error_msg = upload_result.get("error", "Unknown upload error")
raise ValueError(f"File upload failed: {error_msg}")
file_url = upload_result.get("url", "")
file_id = upload_result.get("file_id", "")
debug_info["steps"].append("File uploaded successfully")
debug_info["file_id"] = file_id
debug_info["file_url"] = file_url
debug_info["upload_response"] = upload_result
logger.info(f"Step 1 completed: file_url={file_url}")
# 2. 执行初始化工作流
logger.info(f"Step 2: Running init workflow with name={name}, file_url={file_url}...")
debug_info["steps"].append("Step 2: Running init workflow")
debug_info["workflow_input"] = {"name": name, "file_url": file_url}
workflow_result = await self.run_init_workflow(name, file_url)
# workflow_result 现在返回更多信息
session_id = workflow_result.get("session_id", "")
debug_info["steps"].append("Workflow completed")
debug_info["workflow_response"] = workflow_result.get("raw_response")
debug_info["workflow_data"] = workflow_result.get("parsed_data")
debug_info["debug_url"] = workflow_result.get("debug_url")
logger.info(f"Init workflow completed, session_id: {session_id}")
return {
"session_id": session_id,
"debug_info": debug_info
}
async def create_audio_room(
self,
user_id: str,
file_id: Optional[str] = None,
session_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
创建语音房间,让 Bot 加入
Coze API 会返回完整的 RTC 连接信息:
- app_id: 火山引擎 RTC App ID
- room_id: 房间 ID
- token: RTC Token
- uid: 用户 ID
Args:
user_id: 用户 ID
file_id: 简历文件 ID可选
session_id: 会话 ID可选
Returns:
RTC 连接信息
"""
url = f"{self.base_url}/v1/audio/rooms"
# 将 session_id 作为 user_id 传递,这样工作流可以从 sys_var.user_id 获取
# 如果有 session_id用它作为 user_id否则用原始 user_id
actual_user_id = session_id if session_id else user_id
payload = {
"bot_id": self.bot_id,
"user_id": actual_user_id, # session_id 作为 user_id
}
# 添加语音 ID
if settings.COZE_VOICE_ID:
payload["voice_id"] = settings.COZE_VOICE_ID
# ========== 尝试通过多种方式传递 session_id ==========
# 方式 1: parameters类似工作流 API
if session_id:
payload["parameters"] = {
"session_id": session_id,
}
# 方式 2: custom_variables类似对话 API
if session_id:
payload["custom_variables"] = {
"session_id": session_id,
}
# 方式 3: connector_id / extra_info备用
if session_id:
payload["extra_info"] = {
"session_id": session_id,
}
# 方式 4: config 对象
config = {}
if file_id:
config["input_file_id"] = file_id
if session_id:
config["session_id"] = session_id
config["parameters"] = {"session_id": session_id}
if config:
payload["config"] = config
logger.info(f"Creating audio room with payload: {payload}")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(url, json=payload, headers=self.headers)
response.raise_for_status()
data = response.json()
logger.info(f"Audio room created: {data}")
result = data.get("data", data)
# 添加调试信息(避免循环引用,不包含 data 字段)
result["debug_info"] = {
"session_id": session_id,
"actual_user_id": actual_user_id,
"bot_id": self.bot_id,
"request_payload": payload, # 完整的请求参数(包括所有尝试的字段)
"coze_code": data.get("code"),
"coze_msg": data.get("msg"),
"coze_logid": data.get("detail", {}).get("logid"),
"coze_bot_url": f"https://www.coze.cn/space/7516832346776780836/bot/{self.bot_id}",
"search_hint": f"工作流可通过 {{{{sys_var.user_id}}}} 获取 session_id: {actual_user_id}",
"tried_methods": [
"user_id (as session_id)",
"parameters.session_id",
"custom_variables.session_id",
"extra_info.session_id",
"config.session_id",
"config.parameters.session_id",
],
}
return result
async def chat_via_workflow(
self,
session_id: str,
message: str,
workflow_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
通过工作流 API 进行文字对话(推荐用于文字面试模式)
使用 /v1/workflow/run API 直接调用工作流 B
可以正确传递 session_id 等必填参数。
Args:
session_id: 会话 ID来自工作流 A
message: 用户消息
workflow_id: 工作流 ID可选默认使用 INTERVIEW_WORKFLOW_ID
Returns:
{"reply": "AI回复"}
"""
import json
wf_id = workflow_id or self.INTERVIEW_WORKFLOW_ID
if not wf_id:
raise ValueError("工作流 B 的 ID 未配置,请设置 INTERVIEW_WORKFLOW_ID")
url = f"{self.base_url}/v1/workflow/run"
payload = {
"workflow_id": wf_id,
"parameters": {
"session_id": session_id,
"USER_INPUT": message, # 用户输入
}
}
logger.info(f"Workflow chat request: {payload}")
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(url, json=payload, headers=self.headers)
response.raise_for_status()
data = response.json()
logger.info(f"Workflow chat response: {data}")
if data.get("code") == 0:
# 解析工作流输出
output_str = data.get("data", "")
try:
if output_str:
output_data = json.loads(output_str)
# 尝试提取回复内容
if isinstance(output_data, dict):
reply = output_data.get("reply") or output_data.get("output") or output_data.get("data", "")
if isinstance(reply, str):
return {"reply": reply}
elif isinstance(reply, dict):
return {"reply": reply.get("content", str(reply))}
elif isinstance(output_data, str):
return {"reply": output_data}
except json.JSONDecodeError:
# 如果不是 JSON直接返回原始字符串
return {"reply": output_str}
return {"reply": output_str or "工作流执行完成"}
else:
error_msg = data.get("msg", "Unknown error")
raise ValueError(f"Workflow execution failed: {error_msg}")
async def chat(
self,
message: str,
user_id: str,
conversation_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
文本对话 - 使用 /v1/workflows/chat 专用接口
这个接口专门用于 Chatflow支持
1. 通过 parameters 传递自定义参数(如 session_id
2. 正确维持问答节点的对话状态
Args:
message: 用户消息
user_id: session_id面试会话 ID
conversation_id: 对话 ID用于多轮对话
Returns:
{"reply": "AI回复", "conversation_id": "xxx"}
"""
import asyncio
# 尝试使用 /v1/workflows/chat 专用接口
# 工作流 B 的 IDChatflow
workflow_id = "7595077233002840079"
url = f"{self.base_url}/v1/workflows/chat"
# 构建 payload
payload = {
"workflow_id": workflow_id,
"user_id": user_id,
"stream": False,
"additional_messages": [
{
"role": "user",
"content": message,
"content_type": "text",
}
],
# 通过 parameters 显式传递 session_id
"parameters": {
"session_id": user_id,
},
}
# 传递 conversation_id 延续对话
if conversation_id:
payload["conversation_id"] = conversation_id
logger.info(f"[Workflows/Chat] request: session_id={user_id}, conv_id={conversation_id}, message={message[:50]}...")
logger.debug(f"[Workflows/Chat] payload: {payload}")
async with httpx.AsyncClient(timeout=120.0) as client:
try:
response = await client.post(url, json=payload, headers=self.headers)
# 检查响应状态
logger.info(f"[Workflows/Chat] status: {response.status_code}, text: {response.text[:200] if response.text else 'empty'}")
if response.status_code != 200 or not response.text:
logger.warning(f"[Workflows/Chat] failed (status={response.status_code}), falling back to /v3/chat")
return await self._chat_v3(message, user_id, conversation_id)
data = response.json()
logger.info(f"[Workflows/Chat] response: code={data.get('code')}, msg={data.get('msg', '')}")
if data.get("code") != 0:
# 如果 /v1/workflows/chat 失败,回退到 /v3/chat
logger.warning(f"[Workflows/Chat] API error, falling back to /v3/chat")
return await self._chat_v3(message, user_id, conversation_id)
except Exception as e:
logger.warning(f"[Workflows/Chat] exception: {e}, falling back to /v3/chat")
return await self._chat_v3(message, user_id, conversation_id)
# 解析响应
chat_data = data.get("data", {})
conv_id = chat_data.get("conversation_id", "")
chat_id = chat_data.get("id", "")
logger.info(f"[Workflows/Chat] started: conv_id={conv_id}, chat_id={chat_id}")
# 轮询等待回复完成
if chat_id and conv_id:
result = await self._wait_for_reply(conv_id, chat_id)
return {
"reply": result.get("reply", ""),
"conversation_id": conv_id,
"debug_info": result.get("debug_info", {}),
}
return {
"reply": "抱歉,我没有理解您的意思,请再说一次。",
"conversation_id": conversation_id or "",
"debug_info": {"error": "Workflows/Chat API error"},
}
async def _chat_v3(
self,
message: str,
user_id: str,
conversation_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
备用方法:使用 /v3/chat API
"""
url = f"{self.base_url}/v3/chat"
# 构建消息内容:嵌入 session_id
if not conversation_id:
content = f"[SESSION:{user_id}]\n{message}"
else:
content = message
payload = {
"bot_id": self.bot_id,
"user_id": user_id,
"stream": False,
"auto_save_history": True,
"additional_messages": [
{
"role": "user",
"content": content,
"content_type": "text",
}
],
}
if conversation_id:
payload["conversation_id"] = conversation_id
logger.info(f"[v3/chat] request: user_id={user_id}, conv_id={conversation_id}")
logger.debug(f"[v3/chat] payload: {payload}")
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(url, json=payload, headers=self.headers)
response.raise_for_status()
data = response.json()
logger.info(f"[v3/chat] response: code={data.get('code')}")
if data.get("code") == 0:
chat_data = data.get("data", {})
conv_id = chat_data.get("conversation_id", "")
chat_id = chat_data.get("id", "")
logger.info(f"[v3/chat] started: conv_id={conv_id}, chat_id={chat_id}")
if chat_id and conv_id:
result = await self._wait_for_reply(conv_id, chat_id)
# 构建 Coze 后台查询链接
coze_debug_url = f"https://www.coze.cn/space/7516832346776780836/bot/{self.bot_id}"
debug_info = result.get("debug_info", {})
debug_info.update({
"conversation_id": conv_id,
"chat_id": chat_id,
"session_id": user_id,
"coze_bot_url": coze_debug_url,
"search_hint": f"在 Coze 后台搜索 conversation_id: {conv_id} 或 user_id: {user_id}",
})
return {
"reply": result.get("reply", ""),
"conversation_id": conv_id,
"debug_info": debug_info,
}
else:
error_msg = data.get("msg", "Unknown error")
logger.error(f"[v3/chat] API error: {error_msg}")
return {
"reply": "抱歉,我没有理解您的意思,请再说一次。",
"conversation_id": conversation_id or "",
"debug_info": {"error": "v3/chat API error"},
}
async def _wait_for_reply(
self,
conversation_id: str,
chat_id: str,
max_retries: int = 30,
) -> dict:
"""
等待 AI 回复完成
Returns:
dict: {"reply": str, "debug_info": dict}
"""
import asyncio
import json
url = f"{self.base_url}/v3/chat/retrieve"
debug_info = {
"status_history": [],
"messages": [],
"raw_responses": [],
}
for i in range(max_retries):
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
url,
params={
"conversation_id": conversation_id,
"chat_id": chat_id,
},
headers=self.headers
)
if response.status_code == 200:
data = response.json()
# 记录原始响应(用于调试)
debug_info["raw_responses"].append({
"iteration": i,
"data": data
})
if data.get("code") == 0:
chat_data = data.get("data", {})
status = chat_data.get("status", "")
# 记录状态历史
status_info = {
"iteration": i,
"status": status,
"required_action": chat_data.get("required_action"),
}
debug_info["status_history"].append(status_info)
logger.info(f"Chat status [{i}]: {status}")
# 打印详细的节点信息
if chat_data.get("required_action"):
action = chat_data.get("required_action")
logger.info(f"🔔 Required action: {json.dumps(action, ensure_ascii=False, indent=2)}")
if status == "completed":
# 获取消息列表
messages = await self._get_messages(conversation_id, chat_id)
debug_info["messages"] = messages
# 找到 AI 的回复
for msg in reversed(messages):
if msg.get("role") == "assistant" and msg.get("type") == "answer":
return {
"reply": msg.get("content", ""),
"debug_info": debug_info,
}
return {
"reply": "面试官正在思考...",
"debug_info": debug_info,
}
elif status == "failed":
return {
"reply": f"抱歉,出现了一些问题:{chat_data.get('last_error', {}).get('msg', '未知错误')}",
"debug_info": debug_info,
}
elif status == "requires_action":
# 工作流需要用户输入question 节点或文件上传节点)
messages = await self._get_messages(conversation_id, chat_id)
debug_info["messages"] = messages
# 打印所有消息用于调试
logger.info(f"📨 Messages ({len(messages)} total):")
for idx, msg in enumerate(messages):
msg_type = msg.get("type", "unknown")
msg_role = msg.get("role", "unknown")
msg_content = msg.get("content", "")[:200]
logger.info(f" [{idx}] {msg_role}/{msg_type}: {msg_content}")
# 检查是否有文件上传请求
for msg in messages:
if msg.get("type") == "tool_call":
logger.info(f"🔧 Tool call detected: {msg.get('content', '')}")
# 返回 AI 的问题
for msg in reversed(messages):
if msg.get("role") == "assistant" and msg.get("type") == "answer":
content = msg.get("content", "")
if content:
return {
"reply": content,
"debug_info": debug_info,
}
return {
"reply": "请回答上面的问题...",
"debug_info": debug_info,
}
# 其他状态in_progress, created继续等待
await asyncio.sleep(1)
return {
"reply": "响应超时,请重试。",
"debug_info": debug_info,
}
async def _get_messages(
self,
conversation_id: str,
chat_id: str,
) -> list:
"""
获取对话消息列表
"""
url = f"{self.base_url}/v3/chat/message/list"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
url,
params={
"conversation_id": conversation_id,
"chat_id": chat_id,
},
headers=self.headers
)
if response.status_code == 200:
data = response.json()
logger.info(f"Messages response: {data}")
if data.get("code") == 0:
return data.get("data", [])
return []
# 创建单例
coze_service = CozeService()