""" Coze API 服务封装 """ import time import httpx from typing import Optional, Dict, Any from loguru import logger from app.config import settings class CozeService: """Coze API 服务""" def __init__(self): self.base_url = settings.COZE_API_BASE self.headers = { "Authorization": f"Bearer {settings.COZE_PAT_TOKEN}", "Content-Type": "application/json", } self.bot_id = settings.COZE_BOT_ID # 工作流 A 的 ID(初始化工作流) INIT_WORKFLOW_ID = "7597357422713798710" # 工作流 B 的 ID(面试工作流) INTERVIEW_WORKFLOW_ID = "7595077233002840079" async def upload_file(self, file_content: bytes, filename: str) -> Dict[str, Any]: """ 上传文件到 Coze Args: file_content: 文件内容 filename: 文件名 Returns: {"id": "file_xxx", ...} """ url = f"{self.base_url}/v1/files/upload" async with httpx.AsyncClient(timeout=60.0) as client: files = {"file": (filename, file_content)} headers = {"Authorization": f"Bearer {settings.COZE_PAT_TOKEN}"} response = await client.post(url, files=files, headers=headers) response.raise_for_status() data = response.json() logger.info(f"File uploaded: {data}") if "data" in data: return data["data"] return data async def get_file_url(self, file_id: str) -> str: """ 获取 Coze 文件的临时下载链接 Args: file_id: 文件 ID Returns: 文件的临时下载 URL """ url = f"{self.base_url}/v1/files/retrieve" async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( url, params={"file_id": file_id}, headers=self.headers ) response.raise_for_status() data = response.json() logger.info(f"File retrieve response: {data}") # 返回文件的临时 URL file_info = data.get("data", {}) file_url = file_info.get("url", "") if not file_url: raise ValueError(f"Failed to get file URL for file_id: {file_id}") return file_url async def run_init_workflow( self, name: str, file_url: str, ) -> Dict[str, Any]: """ 执行初始化工作流(工作流A) - 上传简历、解析、创建数据库记录 - 返回 session_id 和调试信息 Args: name: 候选人姓名 file_url: 简历文件链接 Returns: {"session_id": "xxx", "raw_response": {...}, "parsed_data": {...}, "debug_url": "..."} """ import asyncio url = f"{self.base_url}/v1/workflow/run" payload = { "workflow_id": self.INIT_WORKFLOW_ID, "parameters": { "name": name, "file_url": file_url, } } logger.info(f"Running init workflow with payload: {payload}") async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post(url, json=payload, headers=self.headers) response.raise_for_status() data = response.json() logger.info(f"Init workflow response: {data}") result = { "session_id": "", "raw_response": data, "parsed_data": None, "debug_url": data.get("debug_url", ""), "execute_id": data.get("execute_id", ""), "code": data.get("code"), "msg": data.get("msg", ""), } if data.get("code") == 0: import json # execute_id 在顶层 execute_id = data.get("execute_id", "") # data 字段是工作流输出(JSON 字符串) output_str = data.get("data", "") result["execute_id"] = execute_id result["output_str"] = output_str logger.info(f"Workflow execute_id: {execute_id}") logger.info(f"Workflow output_str: {output_str}") # 构建调试链接 result["debug_url"] = f"https://www.coze.cn/work_flow?execute_id={execute_id}&space_id=7516832346776780836&workflow_id={self.INIT_WORKFLOW_ID}&execute_mode=2" # 解析工作流输出 session_id = None output_data = None if output_str: try: output_data = json.loads(output_str) result["parsed_data"] = output_data logger.info(f"Workflow output_data: {output_data}") # 尝试从不同格式中提取 session_id if isinstance(output_data, dict): # 格式1: {"session_id": "xxx"} session_id = output_data.get("session_id") # 格式2: {"data": "SESS_xxx"} - data 直接是 session_id 字符串 if not session_id and "data" in output_data: inner_data = output_data.get("data") # 如果 data 是字符串且以 SESS_ 开头,直接使用 if isinstance(inner_data, str) and inner_data.startswith("SESS_"): session_id = inner_data elif isinstance(inner_data, str): # 尝试解析为 JSON try: inner_data = json.loads(inner_data) except: pass if isinstance(inner_data, dict): session_id = inner_data.get("session_id") elif isinstance(inner_data, list) and len(inner_data) > 0: session_id = inner_data[0].get("session_id") if isinstance(inner_data[0], dict) else None elif isinstance(output_data, list) and len(output_data) > 0: # 格式3: [{"session_id": "xxx"}] if isinstance(output_data[0], dict): session_id = output_data[0].get("session_id") except json.JSONDecodeError as e: logger.warning(f"Failed to parse workflow output: {e}") result["parse_error"] = str(e) # 如果没有 session_id,使用 execute_id 作为替代 if not session_id: logger.warning(f"No session_id in workflow output, using execute_id as session_id") session_id = f"WF_{execute_id}" if execute_id else f"SESS_{int(time.time())}" result["session_id_source"] = "execute_id_fallback" else: result["session_id_source"] = "workflow_output" result["session_id"] = session_id logger.info(f"Final session_id: {session_id}") return result else: error_msg = data.get("msg", "Unknown error") result["error"] = error_msg raise ValueError(f"Workflow execution failed: {error_msg}") async def _wait_for_workflow_result( self, execute_id: str, max_retries: int = 60, ) -> str: """ 等待工作流执行完成并获取结果 """ import asyncio url = f"{self.base_url}/v1/workflow/run_histories" for i in range(max_retries): async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( url, params={"execute_id": execute_id}, headers=self.headers ) if response.status_code == 200: data = response.json() logger.info(f"Workflow status [{i}]: {data}") if data.get("code") == 0: result_data = data.get("data", {}) status = result_data.get("status", "") if status == "Success": output = result_data.get("output", "") # 解析输出获取 session_id try: import json output_data = json.loads(output) return output_data.get("session_id", output) except: return output elif status == "Failed": raise ValueError(f"Workflow failed: {result_data.get('error', 'Unknown')}") # Running 状态继续等待 await asyncio.sleep(1) raise TimeoutError("Workflow execution timeout") async def init_interview( self, name: str, file_content: bytes, filename: str, ) -> Dict[str, Any]: """ 完整的面试初始化流程 1. 上传文件到远程服务器 2. 获取公网可访问的 URL 3. 执行初始化工作流 4. 返回 session_id 和调试信息 Args: name: 候选人姓名 file_content: 简历文件内容 filename: 文件名 Returns: {"session_id": "xxx", "debug_info": {...}} """ debug_info = { "steps": [], "timestamps": {}, } # 1. 上传文件到远程服务器 debug_info["steps"].append("Step 1: Uploading file to remote server") debug_info["file_size"] = len(file_content) logger.info(f"Step 1: Uploading file to remote server ({len(file_content)} bytes)...") # 检查配置 if not settings.FILE_SERVER_TOKEN: raise ValueError("FILE_SERVER_TOKEN is not configured. Please set FILE_SERVER_TOKEN in .env file.") # 调用远程 PHP 上传接口 upload_url = settings.FILE_SERVER_UPLOAD_URL async with httpx.AsyncClient(timeout=60.0) as client: files = {"file": (filename, file_content, "application/pdf")} data = {"token": settings.FILE_SERVER_TOKEN} response = await client.post(upload_url, files=files, data=data) response.raise_for_status() upload_result = response.json() logger.info(f"Upload response: {upload_result}") if upload_result.get("code") != 0: error_msg = upload_result.get("error", "Unknown upload error") raise ValueError(f"File upload failed: {error_msg}") file_url = upload_result.get("url", "") file_id = upload_result.get("file_id", "") debug_info["steps"].append("File uploaded successfully") debug_info["file_id"] = file_id debug_info["file_url"] = file_url debug_info["upload_response"] = upload_result logger.info(f"Step 1 completed: file_url={file_url}") # 2. 执行初始化工作流 logger.info(f"Step 2: Running init workflow with name={name}, file_url={file_url}...") debug_info["steps"].append("Step 2: Running init workflow") debug_info["workflow_input"] = {"name": name, "file_url": file_url} workflow_result = await self.run_init_workflow(name, file_url) # workflow_result 现在返回更多信息 session_id = workflow_result.get("session_id", "") debug_info["steps"].append("Workflow completed") debug_info["workflow_response"] = workflow_result.get("raw_response") debug_info["workflow_data"] = workflow_result.get("parsed_data") debug_info["debug_url"] = workflow_result.get("debug_url") logger.info(f"Init workflow completed, session_id: {session_id}") return { "session_id": session_id, "debug_info": debug_info } async def create_audio_room( self, user_id: str, file_id: Optional[str] = None, session_id: Optional[str] = None, ) -> Dict[str, Any]: """ 创建语音房间,让 Bot 加入 Coze API 会返回完整的 RTC 连接信息: - app_id: 火山引擎 RTC App ID - room_id: 房间 ID - token: RTC Token - uid: 用户 ID Args: user_id: 用户 ID file_id: 简历文件 ID(可选) session_id: 会话 ID(可选) Returns: RTC 连接信息 """ url = f"{self.base_url}/v1/audio/rooms" # 将 session_id 作为 user_id 传递,这样工作流可以从 sys_var.user_id 获取 # 如果有 session_id,用它作为 user_id;否则用原始 user_id actual_user_id = session_id if session_id else user_id payload = { "bot_id": self.bot_id, "user_id": actual_user_id, # session_id 作为 user_id } # 添加语音 ID if settings.COZE_VOICE_ID: payload["voice_id"] = settings.COZE_VOICE_ID # ========== 尝试通过多种方式传递 session_id ========== # 方式 1: parameters(类似工作流 API) if session_id: payload["parameters"] = { "session_id": session_id, } # 方式 2: custom_variables(类似对话 API) if session_id: payload["custom_variables"] = { "session_id": session_id, } # 方式 3: connector_id / extra_info(备用) if session_id: payload["extra_info"] = { "session_id": session_id, } # 方式 4: config 对象 config = {} if file_id: config["input_file_id"] = file_id if session_id: config["session_id"] = session_id config["parameters"] = {"session_id": session_id} if config: payload["config"] = config logger.info(f"Creating audio room with payload: {payload}") async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(url, json=payload, headers=self.headers) response.raise_for_status() data = response.json() logger.info(f"Audio room created: {data}") result = data.get("data", data) # 添加调试信息(避免循环引用,不包含 data 字段) result["debug_info"] = { "session_id": session_id, "actual_user_id": actual_user_id, "bot_id": self.bot_id, "request_payload": payload, # 完整的请求参数(包括所有尝试的字段) "coze_code": data.get("code"), "coze_msg": data.get("msg"), "coze_logid": data.get("detail", {}).get("logid"), "coze_bot_url": f"https://www.coze.cn/space/7516832346776780836/bot/{self.bot_id}", "search_hint": f"工作流可通过 {{{{sys_var.user_id}}}} 获取 session_id: {actual_user_id}", "tried_methods": [ "user_id (as session_id)", "parameters.session_id", "custom_variables.session_id", "extra_info.session_id", "config.session_id", "config.parameters.session_id", ], } return result async def chat_via_workflow( self, session_id: str, message: str, workflow_id: Optional[str] = None, ) -> Dict[str, Any]: """ 通过工作流 API 进行文字对话(推荐用于文字面试模式) 使用 /v1/workflow/run API 直接调用工作流 B, 可以正确传递 session_id 等必填参数。 Args: session_id: 会话 ID(来自工作流 A) message: 用户消息 workflow_id: 工作流 ID(可选,默认使用 INTERVIEW_WORKFLOW_ID) Returns: {"reply": "AI回复"} """ import json wf_id = workflow_id or self.INTERVIEW_WORKFLOW_ID if not wf_id: raise ValueError("工作流 B 的 ID 未配置,请设置 INTERVIEW_WORKFLOW_ID") url = f"{self.base_url}/v1/workflow/run" payload = { "workflow_id": wf_id, "parameters": { "session_id": session_id, "USER_INPUT": message, # 用户输入 } } logger.info(f"Workflow chat request: {payload}") async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post(url, json=payload, headers=self.headers) response.raise_for_status() data = response.json() logger.info(f"Workflow chat response: {data}") if data.get("code") == 0: # 解析工作流输出 output_str = data.get("data", "") try: if output_str: output_data = json.loads(output_str) # 尝试提取回复内容 if isinstance(output_data, dict): reply = output_data.get("reply") or output_data.get("output") or output_data.get("data", "") if isinstance(reply, str): return {"reply": reply} elif isinstance(reply, dict): return {"reply": reply.get("content", str(reply))} elif isinstance(output_data, str): return {"reply": output_data} except json.JSONDecodeError: # 如果不是 JSON,直接返回原始字符串 return {"reply": output_str} return {"reply": output_str or "工作流执行完成"} else: error_msg = data.get("msg", "Unknown error") raise ValueError(f"Workflow execution failed: {error_msg}") async def chat( self, message: str, user_id: str, conversation_id: Optional[str] = None, ) -> Dict[str, Any]: """ 文本对话 - 使用 /v1/workflows/chat 专用接口 这个接口专门用于 Chatflow,支持: 1. 通过 parameters 传递自定义参数(如 session_id) 2. 正确维持问答节点的对话状态 Args: message: 用户消息 user_id: session_id(面试会话 ID) conversation_id: 对话 ID(用于多轮对话) Returns: {"reply": "AI回复", "conversation_id": "xxx"} """ import asyncio # 尝试使用 /v1/workflows/chat 专用接口 # 工作流 B 的 ID(Chatflow) workflow_id = "7595077233002840079" url = f"{self.base_url}/v1/workflows/chat" # 构建 payload payload = { "workflow_id": workflow_id, "user_id": user_id, "stream": False, "additional_messages": [ { "role": "user", "content": message, "content_type": "text", } ], # 通过 parameters 显式传递 session_id "parameters": { "session_id": user_id, }, } # 传递 conversation_id 延续对话 if conversation_id: payload["conversation_id"] = conversation_id logger.info(f"[Workflows/Chat] request: session_id={user_id}, conv_id={conversation_id}, message={message[:50]}...") logger.debug(f"[Workflows/Chat] payload: {payload}") async with httpx.AsyncClient(timeout=120.0) as client: try: response = await client.post(url, json=payload, headers=self.headers) # 检查响应状态 logger.info(f"[Workflows/Chat] status: {response.status_code}, text: {response.text[:200] if response.text else 'empty'}") if response.status_code != 200 or not response.text: logger.warning(f"[Workflows/Chat] failed (status={response.status_code}), falling back to /v3/chat") return await self._chat_v3(message, user_id, conversation_id) data = response.json() logger.info(f"[Workflows/Chat] response: code={data.get('code')}, msg={data.get('msg', '')}") if data.get("code") != 0: # 如果 /v1/workflows/chat 失败,回退到 /v3/chat logger.warning(f"[Workflows/Chat] API error, falling back to /v3/chat") return await self._chat_v3(message, user_id, conversation_id) except Exception as e: logger.warning(f"[Workflows/Chat] exception: {e}, falling back to /v3/chat") return await self._chat_v3(message, user_id, conversation_id) # 解析响应 chat_data = data.get("data", {}) conv_id = chat_data.get("conversation_id", "") chat_id = chat_data.get("id", "") logger.info(f"[Workflows/Chat] started: conv_id={conv_id}, chat_id={chat_id}") # 轮询等待回复完成 if chat_id and conv_id: result = await self._wait_for_reply(conv_id, chat_id) return { "reply": result.get("reply", ""), "conversation_id": conv_id, "debug_info": result.get("debug_info", {}), } return { "reply": "抱歉,我没有理解您的意思,请再说一次。", "conversation_id": conversation_id or "", "debug_info": {"error": "Workflows/Chat API error"}, } async def _chat_v3( self, message: str, user_id: str, conversation_id: Optional[str] = None, ) -> Dict[str, Any]: """ 备用方法:使用 /v3/chat API """ url = f"{self.base_url}/v3/chat" # 构建消息内容:嵌入 session_id if not conversation_id: content = f"[SESSION:{user_id}]\n{message}" else: content = message payload = { "bot_id": self.bot_id, "user_id": user_id, "stream": False, "auto_save_history": True, "additional_messages": [ { "role": "user", "content": content, "content_type": "text", } ], } if conversation_id: payload["conversation_id"] = conversation_id logger.info(f"[v3/chat] request: user_id={user_id}, conv_id={conversation_id}") logger.debug(f"[v3/chat] payload: {payload}") async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post(url, json=payload, headers=self.headers) response.raise_for_status() data = response.json() logger.info(f"[v3/chat] response: code={data.get('code')}") if data.get("code") == 0: chat_data = data.get("data", {}) conv_id = chat_data.get("conversation_id", "") chat_id = chat_data.get("id", "") logger.info(f"[v3/chat] started: conv_id={conv_id}, chat_id={chat_id}") if chat_id and conv_id: result = await self._wait_for_reply(conv_id, chat_id) # 构建 Coze 后台查询链接 coze_debug_url = f"https://www.coze.cn/space/7516832346776780836/bot/{self.bot_id}" debug_info = result.get("debug_info", {}) debug_info.update({ "conversation_id": conv_id, "chat_id": chat_id, "session_id": user_id, "coze_bot_url": coze_debug_url, "search_hint": f"在 Coze 后台搜索 conversation_id: {conv_id} 或 user_id: {user_id}", }) return { "reply": result.get("reply", ""), "conversation_id": conv_id, "debug_info": debug_info, } else: error_msg = data.get("msg", "Unknown error") logger.error(f"[v3/chat] API error: {error_msg}") return { "reply": "抱歉,我没有理解您的意思,请再说一次。", "conversation_id": conversation_id or "", "debug_info": {"error": "v3/chat API error"}, } async def _wait_for_reply( self, conversation_id: str, chat_id: str, max_retries: int = 30, ) -> dict: """ 等待 AI 回复完成 Returns: dict: {"reply": str, "debug_info": dict} """ import asyncio import json url = f"{self.base_url}/v3/chat/retrieve" debug_info = { "status_history": [], "messages": [], "raw_responses": [], } for i in range(max_retries): async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( url, params={ "conversation_id": conversation_id, "chat_id": chat_id, }, headers=self.headers ) if response.status_code == 200: data = response.json() # 记录原始响应(用于调试) debug_info["raw_responses"].append({ "iteration": i, "data": data }) if data.get("code") == 0: chat_data = data.get("data", {}) status = chat_data.get("status", "") # 记录状态历史 status_info = { "iteration": i, "status": status, "required_action": chat_data.get("required_action"), } debug_info["status_history"].append(status_info) logger.info(f"Chat status [{i}]: {status}") # 打印详细的节点信息 if chat_data.get("required_action"): action = chat_data.get("required_action") logger.info(f"🔔 Required action: {json.dumps(action, ensure_ascii=False, indent=2)}") if status == "completed": # 获取消息列表 messages = await self._get_messages(conversation_id, chat_id) debug_info["messages"] = messages # 找到 AI 的回复 for msg in reversed(messages): if msg.get("role") == "assistant" and msg.get("type") == "answer": return { "reply": msg.get("content", ""), "debug_info": debug_info, } return { "reply": "面试官正在思考...", "debug_info": debug_info, } elif status == "failed": return { "reply": f"抱歉,出现了一些问题:{chat_data.get('last_error', {}).get('msg', '未知错误')}", "debug_info": debug_info, } elif status == "requires_action": # 工作流需要用户输入(question 节点或文件上传节点) messages = await self._get_messages(conversation_id, chat_id) debug_info["messages"] = messages # 打印所有消息用于调试 logger.info(f"📨 Messages ({len(messages)} total):") for idx, msg in enumerate(messages): msg_type = msg.get("type", "unknown") msg_role = msg.get("role", "unknown") msg_content = msg.get("content", "")[:200] logger.info(f" [{idx}] {msg_role}/{msg_type}: {msg_content}") # 检查是否有文件上传请求 for msg in messages: if msg.get("type") == "tool_call": logger.info(f"🔧 Tool call detected: {msg.get('content', '')}") # 返回 AI 的问题 for msg in reversed(messages): if msg.get("role") == "assistant" and msg.get("type") == "answer": content = msg.get("content", "") if content: return { "reply": content, "debug_info": debug_info, } return { "reply": "请回答上面的问题...", "debug_info": debug_info, } # 其他状态(in_progress, created)继续等待 await asyncio.sleep(1) return { "reply": "响应超时,请重试。", "debug_info": debug_info, } async def _get_messages( self, conversation_id: str, chat_id: str, ) -> list: """ 获取对话消息列表 """ url = f"{self.base_url}/v3/chat/message/list" async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( url, params={ "conversation_id": conversation_id, "chat_id": chat_id, }, headers=self.headers ) if response.status_code == 200: data = response.json() logger.info(f"Messages response: {data}") if data.get("code") == 0: return data.get("data", []) return [] # 创建单例 coze_service = CozeService()