From c9622d0924beb773e9fe5d28a288bcb020538e9b Mon Sep 17 00:00:00 2001 From: plind <59729252+plind-junior@users.noreply.github.com> Date: Thu, 14 May 2026 21:42:33 -0700 Subject: [PATCH] fix(agentbot): aggregate structured output in non-streaming completions (#14848) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What problem does this PR solve? Closes #13384. The `/api/v1/agentbots//completions` non-streaming path returned the first yielded SSE chunk and exited: ```python async for answer in agent_completion(objs[0].tenant_id, agent_id, **req): return get_result(data=answer) ``` That meant structured output, the full assistant message, and reference data were all dropped when an agent was called with `stream=false`. Streaming worked because each event was forwarded individually; non-streaming was returning a raw SSE-formatted string from a single early event. The v1 endpoint at [`agent_api.py:1006-1050`](https://github.com/infiniflow/ragflow/blob/main/api/apps/restful_apis/agent_api.py#L1006-L1050) already handles this correctly. This PR mirrors that aggregation in the SDK beta endpoint: parse each SSE line, accumulate `content` from `message` events, merge `reference`, collect `outputs.structured` from each `node_finished` event keyed by `component_id`, and attach all of them to the final response. ## Type of change - [x] Bug fix (non-breaking change which fixes an issue) ## Test plan - [ ] Build an agent with a node that emits structured output, call `POST /api/v1/agentbots//completions` with `stream=false` and a beta API token, verify `data.structured.` is present in the response. - [ ] Same agent with `stream=true` — verify behavior is unchanged. - [ ] Agent without structured output — verify `data.structured` is omitted, `content` and `reference` still aggregated correctly. --- api/apps/sdk/session.py | 48 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/api/apps/sdk/session.py b/api/apps/sdk/session.py index 7ba6fbd81d..394ba71e90 100644 --- a/api/apps/sdk/session.py +++ b/api/apps/sdk/session.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import copy import json import re @@ -293,13 +294,56 @@ async def agent_bot_completions(agent_id): return resp try: + full_content = "" + reference = {} + structured_output = {} + final_ans = {} async for answer in agent_completion(objs[0].tenant_id, agent_id, **req): - return get_result(data=answer) + # agent_completion yields SSE-formatted strings. A single yielded + # chunk can contain multiple "data:..." frames separated by "\n\n" + # plus blank or comment lines, so parse line-by-line rather than + # assuming one frame per chunk. + if not isinstance(answer, str): + continue + for line in answer.splitlines(): + line = line.strip() + if not line.startswith("data:"): + continue + payload = line[len("data:"):].strip() + if not payload: + continue + try: + ans = json.loads(payload) + except Exception as e: + logging.debug("agent_bot_completions: skipping malformed SSE frame: %s", e) + continue + event = ans.get("event") + if event == "message": + full_content += ans.get("data", {}).get("content", "") or "" + if ans.get("data", {}).get("reference"): + reference.update(ans["data"]["reference"]) + if event == "node_finished": + data = ans.get("data", {}) + node_out = data.get("outputs") or {} + component_id = data.get("component_id") + if component_id is not None and "structured" in node_out: + structured_output[component_id] = copy.deepcopy(node_out["structured"]) + final_ans = ans + + if not final_ans: + return get_result(data={}) + + if "data" not in final_ans or not isinstance(final_ans["data"], dict): + final_ans["data"] = {} + final_ans["data"]["content"] = full_content + final_ans["data"]["reference"] = reference + if structured_output: + final_ans["data"]["structured"] = structured_output + return get_result(data=final_ans) except Exception as e: logging.exception(e) return get_error_data_result(message=str(e) or "Unknown error") - return None @manager.route("/agentbots//inputs", methods=["GET"]) # noqa: F821 async def begin_inputs(agent_id):