From 38f8f8a65636d7201f7afbc0d4aeed225181a204 Mon Sep 17 00:00:00 2001 From: RazmikGevorgyan <48706091+RazmikGevorgyan@users.noreply.github.com> Date: Wed, 1 Jul 2026 05:33:41 +0400 Subject: [PATCH] =?UTF-8?q?fix:=20handle=20non-serializable=20objects=20in?= =?UTF-8?q?=20agent=20canvas=20SSE=20and=20state=20se=E2=80=A6=20(#14210)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …rialization Agent components (llm.py, agent_with_tools.py, message.py) store functools.partial objects as deferred streaming handles in their output slots. When the canvas state gets serialized for SSE events, Redis commits, or logging, these partials — plus non-copyable objects like Langfuse clients — crash json.dumps and deepcopy. Changes: - canvas_app.py: add default=str to json.dumps for SSE event serialization (lines 238, 296) - canvas.py: wrap deepcopy calls in try/except to handle non-copyable objects (Langfuse clients, etc.), add default=str to final json.dumps - base.py: add default=str to ComponentParamBase.__str__ to handle non-serializable objects in component parameters Closes #14229 ### What problem does this PR solve? _Briefly describe what this PR aims to solve. Include background context that will help reviewers understand the purpose of the PR._ ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [ ] New Feature (non-breaking change which adds functionality) - [ ] Documentation Update - [ ] Refactoring - [ ] Performance Improvement - [ ] Other (please describe): Co-authored-by: Claude Opus 4.6 (1M context) Co-authored-by: yzc --- agent/canvas.py | 19 ++++++++++++++++--- agent/component/base.py | 7 ++++++- api/apps/restful_apis/agent_api.py | 18 ++++++++++++++++-- 3 files changed, 38 insertions(+), 6 deletions(-) diff --git a/agent/canvas.py b/agent/canvas.py index 86fd8689ba..15f5cf0449 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -121,7 +121,11 @@ class Graph: for k in self.dsl.keys(): if k in ["components"]: continue - dsl[k] = deepcopy(self.dsl[k]) + try: + dsl[k] = deepcopy(self.dsl[k]) + except Exception as e: + logging.warning("Graph.__str__: deepcopy failed for dsl key '%s' (type=%s): %s. Using shallow reference.", k, type(self.dsl[k]).__name__, e) + dsl[k] = self.dsl[k] for k, cpn in self.components.items(): if k not in dsl["components"]: @@ -130,8 +134,17 @@ class Graph: if c == "obj": dsl["components"][k][c] = json.loads(str(cpn["obj"])) continue - dsl["components"][k][c] = deepcopy(cpn[c]) - return json.dumps(dsl, ensure_ascii=False) + try: + dsl["components"][k][c] = deepcopy(cpn[c]) + except Exception as e: + logging.warning("Graph.__str__: deepcopy failed for component '%s' key '%s' (type=%s): %s. Using shallow reference.", k, c, type(cpn[c]).__name__, e) + dsl["components"][k][c] = cpn[c] + def _serialize_default(obj): + if callable(obj): + return None + logging.warning("Graph.__str__: JSON fallback via str() for type=%s", type(obj).__name__) + return str(obj) + return json.dumps(dsl, ensure_ascii=False, default=_serialize_default) def reset(self): self.path = [] diff --git a/agent/component/base.py b/agent/component/base.py index c0ddea168a..a91bec70ef 100644 --- a/agent/component/base.py +++ b/agent/component/base.py @@ -96,7 +96,12 @@ class ComponentParamBase(ABC): return {name: True for name in self.get_feeded_deprecated_params()} def __str__(self): - return json.dumps(self.as_dict(), ensure_ascii=False) + def _serialize_default(obj): + if callable(obj): + return None + logging.warning("ComponentParamBase.__str__: JSON fallback via str() for type=%s", type(obj).__name__) + return str(obj) + return json.dumps(self.as_dict(), ensure_ascii=False, default=_serialize_default) def as_dict(self): def _recursive_convert_obj_to_dict(obj): diff --git a/api/apps/restful_apis/agent_api.py b/api/apps/restful_apis/agent_api.py index 859719bba8..8ae6723709 100644 --- a/api/apps/restful_apis/agent_api.py +++ b/api/apps/restful_apis/agent_api.py @@ -71,6 +71,20 @@ from peewee import MySQLDatabase, PostgresqlDatabase _background_tasks: Set[asyncio.Task] = set() +def _canvas_json_default(obj): + """Fallback serializer for canvas SSE events. + + Agent components store functools.partial objects as deferred streaming + handles (see llm.py, agent_with_tools.py, message.py). These leak into + SSE event dicts via component input/output propagation and are not + JSON-serializable. This handler converts them to None so that downstream + consumers never receive opaque ``str(partial(...))`` representations. + """ + if callable(obj): + return None + raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable") + + def _require_canvas_access_sync(func): @wraps(func) def wrapper(*args, **kwargs): @@ -312,7 +326,7 @@ async def _run_workflow_session( } ) final_ans = ans - yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" + yield "data:" + json.dumps(ans, ensure_ascii=False, default=_canvas_json_default) + "\n\n" if final_ans: if "data" not in final_ans or not isinstance(final_ans["data"], dict): @@ -1576,7 +1590,7 @@ async def agent_chat_completion(tenant_id, agent_id=None): emitted = False async for ans in _iter_session_completion_events(tenant_id, agent_id, req, return_trace): emitted = True - yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" + yield "data:" + json.dumps(ans, ensure_ascii=False, default=_canvas_json_default) + "\n\n" if not emitted: # Parity with the new-session SSE path: if the canvas yields # no events on an existing session (e.g. empty query), still