mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-01 08:15:44 +08:00
Fix: support non-stream runtime agent completion (#14596)
### What problem does this PR solve? support non-stream runtime agent completion ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@@ -998,37 +998,87 @@ async def agent_chat_completion(tenant_id, agent_id=None):
|
||||
except Exception as exc:
|
||||
return server_error_response(exc)
|
||||
|
||||
async def sse():
|
||||
nonlocal canvas
|
||||
try:
|
||||
async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs):
|
||||
yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
|
||||
|
||||
commit_ok = CanvasReplicaService.commit_after_run(
|
||||
canvas_id=agent_id,
|
||||
tenant_id=str(tenant_id),
|
||||
runtime_user_id=user_id,
|
||||
dsl=json.loads(str(canvas)),
|
||||
canvas_category=canvas_category,
|
||||
title=canvas_title,
|
||||
async def commit_runtime_replica():
|
||||
commit_ok = CanvasReplicaService.commit_after_run(
|
||||
canvas_id=agent_id,
|
||||
tenant_id=str(tenant_id),
|
||||
runtime_user_id=user_id,
|
||||
dsl=json.loads(str(canvas)),
|
||||
canvas_category=canvas_category,
|
||||
title=canvas_title,
|
||||
)
|
||||
if not commit_ok:
|
||||
logging.error(
|
||||
"Canvas runtime replica commit failed: canvas_id=%s tenant_id=%s runtime_user_id=%s",
|
||||
agent_id,
|
||||
tenant_id,
|
||||
user_id,
|
||||
)
|
||||
if not commit_ok:
|
||||
logging.error(
|
||||
"Canvas runtime replica commit failed: canvas_id=%s tenant_id=%s runtime_user_id=%s",
|
||||
agent_id,
|
||||
tenant_id,
|
||||
user_id,
|
||||
|
||||
if req.get("stream", True):
|
||||
async def sse():
|
||||
nonlocal canvas
|
||||
try:
|
||||
async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs):
|
||||
yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
|
||||
|
||||
await commit_runtime_replica()
|
||||
except Exception as exc:
|
||||
logging.exception(exc)
|
||||
canvas.cancel_task()
|
||||
yield (
|
||||
"data:"
|
||||
+ json.dumps({"code": 500, "message": str(exc), "data": False}, ensure_ascii=False)
|
||||
+ "\n\n"
|
||||
)
|
||||
except Exception as exc:
|
||||
logging.exception(exc)
|
||||
canvas.cancel_task()
|
||||
yield (
|
||||
"data:"
|
||||
+ json.dumps({"code": 500, "message": str(exc), "data": False}, ensure_ascii=False)
|
||||
+ "\n\n"
|
||||
)
|
||||
|
||||
return _build_sse_response(sse())
|
||||
return _build_sse_response(sse())
|
||||
|
||||
full_content = ""
|
||||
reference = {}
|
||||
final_ans = {}
|
||||
trace_items = []
|
||||
structured_output = {}
|
||||
try:
|
||||
async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs):
|
||||
if ans.get("event") == "message":
|
||||
full_content += ans.get("data", {}).get("content", "")
|
||||
if ans.get("data", {}).get("reference", None):
|
||||
reference.update(ans["data"]["reference"])
|
||||
if ans.get("event") == "node_finished":
|
||||
data = ans.get("data", {})
|
||||
node_out = data.get("outputs", {})
|
||||
component_id = data.get("component_id")
|
||||
if component_id is not None and "structured" in node_out:
|
||||
structured_output[component_id] = copy.deepcopy(node_out["structured"])
|
||||
if req.get("return_trace", False):
|
||||
trace_items.append(
|
||||
{
|
||||
"component_id": data.get("component_id"),
|
||||
"trace": [copy.deepcopy(data)],
|
||||
}
|
||||
)
|
||||
final_ans = ans
|
||||
except Exception as exc:
|
||||
logging.exception(exc)
|
||||
canvas.cancel_task()
|
||||
return get_result(data=f"**ERROR**: {str(exc)}")
|
||||
|
||||
if not final_ans:
|
||||
await commit_runtime_replica()
|
||||
return get_result(data={})
|
||||
|
||||
if "data" not in final_ans or not isinstance(final_ans["data"], dict):
|
||||
final_ans["data"] = {}
|
||||
final_ans["data"]["content"] = full_content
|
||||
final_ans["data"]["reference"] = reference
|
||||
if structured_output:
|
||||
final_ans["data"]["structured"] = structured_output
|
||||
if trace_items:
|
||||
final_ans["data"]["trace"] = trace_items
|
||||
|
||||
await commit_runtime_replica()
|
||||
return get_result(data=final_ans)
|
||||
|
||||
return_trace = bool(req.get("return_trace", False))
|
||||
if req.get("stream", True):
|
||||
@@ -1042,7 +1092,7 @@ async def agent_chat_completion(tenant_id, agent_id=None):
|
||||
|
||||
full_content = ""
|
||||
reference = {}
|
||||
final_ans = ""
|
||||
final_ans = {}
|
||||
trace_items = []
|
||||
structured_output = {}
|
||||
async for ans in _iter_session_completion_events(tenant_id, agent_id, req, return_trace):
|
||||
@@ -1058,11 +1108,21 @@ async def agent_chat_completion(tenant_id, agent_id=None):
|
||||
if component_id is not None and "structured" in node_out:
|
||||
structured_output[component_id] = copy.deepcopy(node_out["structured"])
|
||||
if return_trace:
|
||||
trace_items = ans.get("data", {}).get("trace", trace_items)
|
||||
trace_items.append(
|
||||
{
|
||||
"component_id": data.get("component_id"),
|
||||
"trace": [copy.deepcopy(data)],
|
||||
}
|
||||
)
|
||||
final_ans = ans
|
||||
except Exception as exc:
|
||||
return get_result(data=f"**ERROR**: {str(exc)}")
|
||||
|
||||
if not final_ans:
|
||||
return get_result(data={})
|
||||
|
||||
if "data" not in final_ans or not isinstance(final_ans["data"], dict):
|
||||
final_ans["data"] = {}
|
||||
final_ans["data"]["content"] = full_content
|
||||
final_ans["data"]["reference"] = reference
|
||||
if structured_output:
|
||||
|
||||
Reference in New Issue
Block a user