""" 测试插入单条对话记录 """ import asyncio import httpx import json COZE_API_BASE = "https://api.coze.cn" COZE_PAT_TOKEN = "pat_nd1wU47WyPS9GCIyJ1clnH8h1WOQXGrYELX8w73TnSZaYbFdYD4swIhzcETBUbfT" WORKFLOW_ID = "7597376294612107318" async def execute_sql(sql: str, table: str) -> dict: """通过工作流执行 SQL""" headers = { "Authorization": f"Bearer {COZE_PAT_TOKEN}", "Content-Type": "application/json" } input_json = json.dumps({"table": table, "sql": sql}, ensure_ascii=False) print(f"\n📤 发送请求:") print(f" table: {table}") print(f" sql: {sql[:100]}...") print(f" input: {input_json[:200]}...") payload = { "workflow_id": WORKFLOW_ID, "parameters": { "input": input_json } } async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( f"{COZE_API_BASE}/v1/workflow/run", headers=headers, json=payload ) result = response.json() print(f"\n📥 响应:") print(f" code: {result.get('code')}") print(f" msg: {result.get('msg')}") if result.get('data'): print(f" data: {result.get('data')[:200]}...") if result.get('debug_url'): print(f" debug: {result.get('debug_url')}") return result async def main(): print("=" * 60) print("测试 INSERT 到 ci_interview_logs") print("=" * 60) # 测试 1: 简单的 INSERT sql1 = """INSERT INTO ci_interview_logs (session_id, stage, round, ai_question, user_answer, log_type) VALUES ('TEST_001', '测试', '1', '测试问题', '测试回答', 'test')""" print("\n\n🧪 测试 1: 简单 INSERT") result = await execute_sql(sql1, "logs") # 测试 2: 查询刚插入的数据 sql2 = """SELECT * FROM ci_interview_logs WHERE session_id = 'TEST_001' LIMIT 5""" print("\n\n🧪 测试 2: 查询刚插入的数据") result = await execute_sql(sql2, "logs") # 测试 3: 删除测试数据 sql3 = """DELETE FROM ci_interview_logs WHERE session_id = 'TEST_001'""" print("\n\n🧪 测试 3: 删除测试数据") result = await execute_sql(sql3, "logs") print("\n" + "=" * 60) print("测试完成") print("=" * 60) if __name__ == "__main__": asyncio.run(main())