90 lines
2.6 KiB
Python
90 lines
2.6 KiB
Python
"""
|
||
测试工作流 C - 通用 SQL 查询(JSON 格式输入)
|
||
"""
|
||
import asyncio
|
||
import httpx
|
||
import os
|
||
import json
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
|
||
PAT_TOKEN = os.getenv("COZE_PAT_TOKEN")
|
||
WORKFLOW_ID = "7597376294612107318"
|
||
|
||
async def test_query(table: str, sql: str):
|
||
url = "https://api.coze.cn/v1/workflow/run"
|
||
headers = {
|
||
"Authorization": f"Bearer {PAT_TOKEN}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
# JSON 格式输入
|
||
input_data = json.dumps({
|
||
"table": table,
|
||
"sql": sql
|
||
}, ensure_ascii=False)
|
||
|
||
payload = {
|
||
"workflow_id": WORKFLOW_ID,
|
||
"parameters": {
|
||
"input": input_data
|
||
}
|
||
}
|
||
|
||
print(f"Table: {table}")
|
||
print(f"SQL: {sql[:80]}...")
|
||
|
||
async with httpx.AsyncClient(timeout=60.0) as client:
|
||
response = await client.post(url, json=payload, headers=headers)
|
||
print(f"Status: {response.status_code}")
|
||
|
||
data = response.json()
|
||
|
||
if data.get("code") == 0:
|
||
print(f"✅ 成功!")
|
||
result_str = data.get("data", "")
|
||
if result_str:
|
||
try:
|
||
inner_data = json.loads(result_str)
|
||
if isinstance(inner_data, list):
|
||
print(f"返回 {len(inner_data)} 条记录")
|
||
if inner_data:
|
||
print(json.dumps(inner_data[0], indent=2, ensure_ascii=False))
|
||
else:
|
||
print(json.dumps(inner_data, indent=2, ensure_ascii=False))
|
||
except:
|
||
print(f"Raw: {result_str[:300]}")
|
||
else:
|
||
print(f"❌ 失败: {data.get('msg')}")
|
||
if data.get("debug_url"):
|
||
print(f"Debug: {data.get('debug_url')}")
|
||
|
||
async def main():
|
||
print("=" * 60)
|
||
print("测试 1: 查询面试评估列表 (assessments)")
|
||
print("=" * 60)
|
||
await test_query(
|
||
"assessments",
|
||
"SELECT session_id, candidate_name, bstudio_create_time FROM ci_interview_assessments ORDER BY bstudio_create_time DESC LIMIT 5"
|
||
)
|
||
|
||
print("\n" + "=" * 60)
|
||
print("测试 2: 查询对话日志 (logs)")
|
||
print("=" * 60)
|
||
await test_query(
|
||
"logs",
|
||
"SELECT log_id, session_id, stage, ai_question, user_answer FROM ci_interview_logs LIMIT 3"
|
||
)
|
||
|
||
print("\n" + "=" * 60)
|
||
print("测试 3: 查询业务配置 (config)")
|
||
print("=" * 60)
|
||
await test_query(
|
||
"config",
|
||
"SELECT config_id, config_type, item_name FROM ci_business_config LIMIT 3"
|
||
)
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|