From febddd446ed49ebde7d6fd6e523418809aa8db35 Mon Sep 17 00:00:00 2001 From: Br1an67 <932039080@qq.com> Date: Mon, 2 Mar 2026 01:04:04 +0800 Subject: [PATCH] Fix: close MCP sessions after canvas execution to prevent connection leaks Add Graph.close() method that finds and closes all MCPToolCallSessions held by Agent components. Call it in finally blocks after canvas.run() in canvas_service.py and canvas_app.py. Also move session cleanup to finally blocks in test_tool endpoint and get_mcp_tools to ensure sessions are closed even on exceptions. --- agent/canvas.py | 14 ++++++++++++++ api/apps/canvas_app.py | 2 ++ api/apps/mcp_server_app.py | 5 +++-- api/db/services/canvas_service.py | 21 ++++++++++++--------- api/utils/api_utils.py | 5 +++-- 5 files changed, 34 insertions(+), 13 deletions(-) diff --git a/agent/canvas.py b/agent/canvas.py index 7a1d3bd234..776e5764b1 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -137,6 +137,20 @@ class Graph: except Exception as e: logging.exception(e) + def close(self): + from common.mcp_tool_call_conn import MCPToolCallSession + seen = set() + for cpn in self.components.values(): + obj = cpn.get("obj") + if obj and hasattr(obj, "tools"): + for tool in obj.tools.values(): + if isinstance(tool, MCPToolCallSession) and id(tool) not in seen: + seen.add(id(tool)) + try: + tool.close_sync(timeout=3) + except Exception: + pass + def get_component_name(self, cid): for n in self.dsl.get("graph", {}).get("nodes", []): if cid == n["id"]: diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index 7fa55d16de..39b18a9f09 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -226,6 +226,8 @@ async def run(): logging.exception(e) canvas.cancel_task() yield "data:" + json.dumps({"code": 500, "message": str(e), "data": False}, ensure_ascii=False) + "\n\n" + finally: + canvas.close() resp = Response(sse(), mimetype="text/event-stream") resp.headers.add_header("Cache-control", "no-cache") diff --git a/api/apps/mcp_server_app.py b/api/apps/mcp_server_app.py index 187560d626..f292b9c758 100644 --- a/api/apps/mcp_server_app.py +++ b/api/apps/mcp_server_app.py @@ -367,11 +367,12 @@ async def test_tool() -> Response: tool_call_sessions.append(tool_call_session) result = await thread_pool_exec(tool_call_session.tool_call, tool_name, arguments, timeout) - # PERF: blocking call to close sessions — consider moving to background thread or task queue - await thread_pool_exec(close_multiple_mcp_toolcall_sessions, tool_call_sessions) return get_json_result(data=result) except Exception as e: return server_error_response(e) + finally: + # PERF: blocking call to close sessions — consider moving to background thread or task queue + await thread_pool_exec(close_multiple_mcp_toolcall_sessions, tool_call_sessions) @manager.route("/cache_tools", methods=["POST"]) # noqa: F821 diff --git a/api/db/services/canvas_service.py b/api/db/services/canvas_service.py index 99cb199004..ed75ef5571 100644 --- a/api/db/services/canvas_service.py +++ b/api/db/services/canvas_service.py @@ -233,15 +233,18 @@ async def completion(tenant_id, agent_id, session_id=None, **kwargs): "files": files }) txt = "" - async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs): - ans["session_id"] = session_id - if ans["event"] == "message": - txt += ans["data"]["content"] - if ans["data"].get("start_to_think", False): - txt += "" - elif ans["data"].get("end_to_think", False): - txt += "" - yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" + try: + async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs): + ans["session_id"] = session_id + if ans["event"] == "message": + txt += ans["data"]["content"] + if ans["data"].get("start_to_think", False): + txt += "" + elif ans["data"].get("end_to_think", False): + txt += "" + yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" + finally: + canvas.close() conv.message.append({"role": "assistant", "content": txt, "created_at": time.time(), "id": message_id}) conv.reference = canvas.get_reference() diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index 9849a5c0eb..79cb4fd2c1 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -685,11 +685,12 @@ def get_mcp_tools(mcp_servers: list, timeout: float | int = 10) -> tuple[dict, s tool_dict["enabled"] = cached_tool.get("enabled", True) results[server_key].append(tool_dict) - # PERF: blocking call to close sessions — consider moving to background thread or task queue - close_multiple_mcp_toolcall_sessions(tool_call_sessions) return results, "" except Exception as e: return {}, str(e) + finally: + # PERF: blocking call to close sessions — consider moving to background thread or task queue + close_multiple_mcp_toolcall_sessions(tool_call_sessions) async def is_strong_enough(chat_model, embedding_model):