fix(agentbot): aggregate structured output in non-streaming completions (#14848)

## What problem does this PR solve?

Closes #13384.

The `/api/v1/agentbots/<agent_id>/completions` non-streaming path
returned the first yielded SSE chunk and exited:

```python
async for answer in agent_completion(objs[0].tenant_id, agent_id, **req):
    return get_result(data=answer)
```

That meant structured output, the full assistant message, and reference
data were all dropped when an agent was called with `stream=false`.
Streaming worked because each event was forwarded individually;
non-streaming was returning a raw SSE-formatted string from a single
early event.

The v1 endpoint at
[`agent_api.py:1006-1050`](https://github.com/infiniflow/ragflow/blob/main/api/apps/restful_apis/agent_api.py#L1006-L1050)
already handles this correctly. This PR mirrors that aggregation in the
SDK beta endpoint: parse each SSE line, accumulate `content` from
`message` events, merge `reference`, collect `outputs.structured` from
each `node_finished` event keyed by `component_id`, and attach all of
them to the final response.

## Type of change

- [x] Bug fix (non-breaking change which fixes an issue)

## Test plan

- [ ] Build an agent with a node that emits structured output, call
`POST /api/v1/agentbots/<agent_id>/completions` with `stream=false` and
a beta API token, verify `data.structured.<component_id>` is present in
the response.
- [ ] Same agent with `stream=true` — verify behavior is unchanged.
- [ ] Agent without structured output — verify `data.structured` is
omitted, `content` and `reference` still aggregated correctly.
This commit is contained in:
plind
2026-05-14 21:42:33 -07:00
committed by GitHub
parent 3a5df08c76
commit c9622d0924

View File

@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import json
import re
@@ -293,13 +294,56 @@ async def agent_bot_completions(agent_id):
return resp
try:
full_content = ""
reference = {}
structured_output = {}
final_ans = {}
async for answer in agent_completion(objs[0].tenant_id, agent_id, **req):
return get_result(data=answer)
# agent_completion yields SSE-formatted strings. A single yielded
# chunk can contain multiple "data:..." frames separated by "\n\n"
# plus blank or comment lines, so parse line-by-line rather than
# assuming one frame per chunk.
if not isinstance(answer, str):
continue
for line in answer.splitlines():
line = line.strip()
if not line.startswith("data:"):
continue
payload = line[len("data:"):].strip()
if not payload:
continue
try:
ans = json.loads(payload)
except Exception as e:
logging.debug("agent_bot_completions: skipping malformed SSE frame: %s", e)
continue
event = ans.get("event")
if event == "message":
full_content += ans.get("data", {}).get("content", "") or ""
if ans.get("data", {}).get("reference"):
reference.update(ans["data"]["reference"])
if event == "node_finished":
data = ans.get("data", {})
node_out = data.get("outputs") or {}
component_id = data.get("component_id")
if component_id is not None and "structured" in node_out:
structured_output[component_id] = copy.deepcopy(node_out["structured"])
final_ans = ans
if not final_ans:
return get_result(data={})
if "data" not in final_ans or not isinstance(final_ans["data"], dict):
final_ans["data"] = {}
final_ans["data"]["content"] = full_content
final_ans["data"]["reference"] = reference
if structured_output:
final_ans["data"]["structured"] = structured_output
return get_result(data=final_ans)
except Exception as e:
logging.exception(e)
return get_error_data_result(message=str(e) or "Unknown error")
return None
@manager.route("/agentbots/<agent_id>/inputs", methods=["GET"]) # noqa: F821
async def begin_inputs(agent_id):