mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-01 16:25:44 +08:00
fix: handle non-serializable objects in agent canvas SSE and state se… (#14210)
…rialization Agent components (llm.py, agent_with_tools.py, message.py) store functools.partial objects as deferred streaming handles in their output slots. When the canvas state gets serialized for SSE events, Redis commits, or logging, these partials — plus non-copyable objects like Langfuse clients — crash json.dumps and deepcopy. Changes: - canvas_app.py: add default=str to json.dumps for SSE event serialization (lines 238, 296) - canvas.py: wrap deepcopy calls in try/except to handle non-copyable objects (Langfuse clients, etc.), add default=str to final json.dumps - base.py: add default=str to ComponentParamBase.__str__ to handle non-serializable objects in component parameters Closes #14229 ### What problem does this PR solve? _Briefly describe what this PR aims to solve. Include background context that will help reviewers understand the purpose of the PR._ ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [ ] New Feature (non-breaking change which adds functionality) - [ ] Documentation Update - [ ] Refactoring - [ ] Performance Improvement - [ ] Other (please describe): Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: yzc <yuzhichang@gmail.com>
This commit is contained in:
@@ -121,7 +121,11 @@ class Graph:
|
||||
for k in self.dsl.keys():
|
||||
if k in ["components"]:
|
||||
continue
|
||||
dsl[k] = deepcopy(self.dsl[k])
|
||||
try:
|
||||
dsl[k] = deepcopy(self.dsl[k])
|
||||
except Exception as e:
|
||||
logging.warning("Graph.__str__: deepcopy failed for dsl key '%s' (type=%s): %s. Using shallow reference.", k, type(self.dsl[k]).__name__, e)
|
||||
dsl[k] = self.dsl[k]
|
||||
|
||||
for k, cpn in self.components.items():
|
||||
if k not in dsl["components"]:
|
||||
@@ -130,8 +134,17 @@ class Graph:
|
||||
if c == "obj":
|
||||
dsl["components"][k][c] = json.loads(str(cpn["obj"]))
|
||||
continue
|
||||
dsl["components"][k][c] = deepcopy(cpn[c])
|
||||
return json.dumps(dsl, ensure_ascii=False)
|
||||
try:
|
||||
dsl["components"][k][c] = deepcopy(cpn[c])
|
||||
except Exception as e:
|
||||
logging.warning("Graph.__str__: deepcopy failed for component '%s' key '%s' (type=%s): %s. Using shallow reference.", k, c, type(cpn[c]).__name__, e)
|
||||
dsl["components"][k][c] = cpn[c]
|
||||
def _serialize_default(obj):
|
||||
if callable(obj):
|
||||
return None
|
||||
logging.warning("Graph.__str__: JSON fallback via str() for type=%s", type(obj).__name__)
|
||||
return str(obj)
|
||||
return json.dumps(dsl, ensure_ascii=False, default=_serialize_default)
|
||||
|
||||
def reset(self):
|
||||
self.path = []
|
||||
|
||||
@@ -96,7 +96,12 @@ class ComponentParamBase(ABC):
|
||||
return {name: True for name in self.get_feeded_deprecated_params()}
|
||||
|
||||
def __str__(self):
|
||||
return json.dumps(self.as_dict(), ensure_ascii=False)
|
||||
def _serialize_default(obj):
|
||||
if callable(obj):
|
||||
return None
|
||||
logging.warning("ComponentParamBase.__str__: JSON fallback via str() for type=%s", type(obj).__name__)
|
||||
return str(obj)
|
||||
return json.dumps(self.as_dict(), ensure_ascii=False, default=_serialize_default)
|
||||
|
||||
def as_dict(self):
|
||||
def _recursive_convert_obj_to_dict(obj):
|
||||
|
||||
@@ -71,6 +71,20 @@ from peewee import MySQLDatabase, PostgresqlDatabase
|
||||
_background_tasks: Set[asyncio.Task] = set()
|
||||
|
||||
|
||||
def _canvas_json_default(obj):
|
||||
"""Fallback serializer for canvas SSE events.
|
||||
|
||||
Agent components store functools.partial objects as deferred streaming
|
||||
handles (see llm.py, agent_with_tools.py, message.py). These leak into
|
||||
SSE event dicts via component input/output propagation and are not
|
||||
JSON-serializable. This handler converts them to None so that downstream
|
||||
consumers never receive opaque ``str(partial(...))`` representations.
|
||||
"""
|
||||
if callable(obj):
|
||||
return None
|
||||
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
|
||||
|
||||
|
||||
def _require_canvas_access_sync(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
@@ -312,7 +326,7 @@ async def _run_workflow_session(
|
||||
}
|
||||
)
|
||||
final_ans = ans
|
||||
yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
|
||||
yield "data:" + json.dumps(ans, ensure_ascii=False, default=_canvas_json_default) + "\n\n"
|
||||
|
||||
if final_ans:
|
||||
if "data" not in final_ans or not isinstance(final_ans["data"], dict):
|
||||
@@ -1576,7 +1590,7 @@ async def agent_chat_completion(tenant_id, agent_id=None):
|
||||
emitted = False
|
||||
async for ans in _iter_session_completion_events(tenant_id, agent_id, req, return_trace):
|
||||
emitted = True
|
||||
yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
|
||||
yield "data:" + json.dumps(ans, ensure_ascii=False, default=_canvas_json_default) + "\n\n"
|
||||
if not emitted:
|
||||
# Parity with the new-session SSE path: if the canvas yields
|
||||
# no events on an existing session (e.g. empty query), still
|
||||
|
||||
Reference in New Issue
Block a user