From faae91d34fde8c371fb50ac63e16bc0d15033932 Mon Sep 17 00:00:00 2001 From: buua436 Date: Wed, 6 May 2026 20:29:15 +0800 Subject: [PATCH] Fix: support non-stream runtime agent completion (#14596) ### What problem does this PR solve? support non-stream runtime agent completion ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- api/apps/restful_apis/agent_api.py | 120 +++++++++++++++++++++-------- 1 file changed, 90 insertions(+), 30 deletions(-) diff --git a/api/apps/restful_apis/agent_api.py b/api/apps/restful_apis/agent_api.py index 62b4a4ab22..3e0c68c328 100644 --- a/api/apps/restful_apis/agent_api.py +++ b/api/apps/restful_apis/agent_api.py @@ -998,37 +998,87 @@ async def agent_chat_completion(tenant_id, agent_id=None): except Exception as exc: return server_error_response(exc) - async def sse(): - nonlocal canvas - try: - async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs): - yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" - - commit_ok = CanvasReplicaService.commit_after_run( - canvas_id=agent_id, - tenant_id=str(tenant_id), - runtime_user_id=user_id, - dsl=json.loads(str(canvas)), - canvas_category=canvas_category, - title=canvas_title, + async def commit_runtime_replica(): + commit_ok = CanvasReplicaService.commit_after_run( + canvas_id=agent_id, + tenant_id=str(tenant_id), + runtime_user_id=user_id, + dsl=json.loads(str(canvas)), + canvas_category=canvas_category, + title=canvas_title, + ) + if not commit_ok: + logging.error( + "Canvas runtime replica commit failed: canvas_id=%s tenant_id=%s runtime_user_id=%s", + agent_id, + tenant_id, + user_id, ) - if not commit_ok: - logging.error( - "Canvas runtime replica commit failed: canvas_id=%s tenant_id=%s runtime_user_id=%s", - agent_id, - tenant_id, - user_id, + + if req.get("stream", True): + async def sse(): + nonlocal canvas + try: + async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs): + yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" + + await commit_runtime_replica() + except Exception as exc: + logging.exception(exc) + canvas.cancel_task() + yield ( + "data:" + + json.dumps({"code": 500, "message": str(exc), "data": False}, ensure_ascii=False) + + "\n\n" ) - except Exception as exc: - logging.exception(exc) - canvas.cancel_task() - yield ( - "data:" - + json.dumps({"code": 500, "message": str(exc), "data": False}, ensure_ascii=False) - + "\n\n" - ) - return _build_sse_response(sse()) + return _build_sse_response(sse()) + + full_content = "" + reference = {} + final_ans = {} + trace_items = [] + structured_output = {} + try: + async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs): + if ans.get("event") == "message": + full_content += ans.get("data", {}).get("content", "") + if ans.get("data", {}).get("reference", None): + reference.update(ans["data"]["reference"]) + if ans.get("event") == "node_finished": + data = ans.get("data", {}) + node_out = data.get("outputs", {}) + component_id = data.get("component_id") + if component_id is not None and "structured" in node_out: + structured_output[component_id] = copy.deepcopy(node_out["structured"]) + if req.get("return_trace", False): + trace_items.append( + { + "component_id": data.get("component_id"), + "trace": [copy.deepcopy(data)], + } + ) + final_ans = ans + except Exception as exc: + logging.exception(exc) + canvas.cancel_task() + return get_result(data=f"**ERROR**: {str(exc)}") + + if not final_ans: + await commit_runtime_replica() + return get_result(data={}) + + if "data" not in final_ans or not isinstance(final_ans["data"], dict): + final_ans["data"] = {} + final_ans["data"]["content"] = full_content + final_ans["data"]["reference"] = reference + if structured_output: + final_ans["data"]["structured"] = structured_output + if trace_items: + final_ans["data"]["trace"] = trace_items + + await commit_runtime_replica() + return get_result(data=final_ans) return_trace = bool(req.get("return_trace", False)) if req.get("stream", True): @@ -1042,7 +1092,7 @@ async def agent_chat_completion(tenant_id, agent_id=None): full_content = "" reference = {} - final_ans = "" + final_ans = {} trace_items = [] structured_output = {} async for ans in _iter_session_completion_events(tenant_id, agent_id, req, return_trace): @@ -1058,11 +1108,21 @@ async def agent_chat_completion(tenant_id, agent_id=None): if component_id is not None and "structured" in node_out: structured_output[component_id] = copy.deepcopy(node_out["structured"]) if return_trace: - trace_items = ans.get("data", {}).get("trace", trace_items) + trace_items.append( + { + "component_id": data.get("component_id"), + "trace": [copy.deepcopy(data)], + } + ) final_ans = ans except Exception as exc: return get_result(data=f"**ERROR**: {str(exc)}") + if not final_ans: + return get_result(data={}) + + if "data" not in final_ans or not isinstance(final_ans["data"], dict): + final_ans["data"] = {} final_ans["data"]["content"] = full_content final_ans["data"]["reference"] = reference if structured_output: