79 lines
2.4 KiB
Python
79 lines
2.4 KiB
Python
"""
|
|
测试插入单条对话记录
|
|
"""
|
|
import asyncio
|
|
import httpx
|
|
import json
|
|
|
|
COZE_API_BASE = "https://api.coze.cn"
|
|
COZE_PAT_TOKEN = "pat_nd1wU47WyPS9GCIyJ1clnH8h1WOQXGrYELX8w73TnSZaYbFdYD4swIhzcETBUbfT"
|
|
WORKFLOW_ID = "7597376294612107318"
|
|
|
|
async def execute_sql(sql: str, table: str) -> dict:
|
|
"""通过工作流执行 SQL"""
|
|
headers = {
|
|
"Authorization": f"Bearer {COZE_PAT_TOKEN}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
input_json = json.dumps({"table": table, "sql": sql}, ensure_ascii=False)
|
|
print(f"\n📤 发送请求:")
|
|
print(f" table: {table}")
|
|
print(f" sql: {sql[:100]}...")
|
|
print(f" input: {input_json[:200]}...")
|
|
|
|
payload = {
|
|
"workflow_id": WORKFLOW_ID,
|
|
"parameters": {
|
|
"input": input_json
|
|
}
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
response = await client.post(
|
|
f"{COZE_API_BASE}/v1/workflow/run",
|
|
headers=headers,
|
|
json=payload
|
|
)
|
|
result = response.json()
|
|
print(f"\n📥 响应:")
|
|
print(f" code: {result.get('code')}")
|
|
print(f" msg: {result.get('msg')}")
|
|
if result.get('data'):
|
|
print(f" data: {result.get('data')[:200]}...")
|
|
if result.get('debug_url'):
|
|
print(f" debug: {result.get('debug_url')}")
|
|
return result
|
|
|
|
|
|
async def main():
|
|
print("=" * 60)
|
|
print("测试 INSERT 到 ci_interview_logs")
|
|
print("=" * 60)
|
|
|
|
# 测试 1: 简单的 INSERT
|
|
sql1 = """INSERT INTO ci_interview_logs (session_id, stage, round, ai_question, user_answer, log_type) VALUES ('TEST_001', '测试', '1', '测试问题', '测试回答', 'test')"""
|
|
|
|
print("\n\n🧪 测试 1: 简单 INSERT")
|
|
result = await execute_sql(sql1, "logs")
|
|
|
|
# 测试 2: 查询刚插入的数据
|
|
sql2 = """SELECT * FROM ci_interview_logs WHERE session_id = 'TEST_001' LIMIT 5"""
|
|
|
|
print("\n\n🧪 测试 2: 查询刚插入的数据")
|
|
result = await execute_sql(sql2, "logs")
|
|
|
|
# 测试 3: 删除测试数据
|
|
sql3 = """DELETE FROM ci_interview_logs WHERE session_id = 'TEST_001'"""
|
|
|
|
print("\n\n🧪 测试 3: 删除测试数据")
|
|
result = await execute_sql(sql3, "logs")
|
|
|
|
print("\n" + "=" * 60)
|
|
print("测试完成")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|