From 8de262e5c440c1bf2ed4faa998421dc8cf26d46e Mon Sep 17 00:00:00 2001
From: Br1an67 <932039080@qq.com>
Date: Mon, 2 Mar 2026 01:04:04 +0800
Subject: [PATCH] Fix: close MCP sessions after canvas execution to prevent
connection leaks
Add Graph.close() method that finds and closes all MCPToolCallSessions
held by Agent components. Call it in finally blocks after canvas.run()
in canvas_service.py and canvas_app.py. Also move session cleanup to
finally blocks in test_tool endpoint and get_mcp_tools to ensure
sessions are closed even on exceptions.
---
agent/canvas.py | 14 ++++++++++++++
api/apps/canvas_app.py | 2 ++
api/apps/mcp_server_app.py | 5 +++--
api/db/services/canvas_service.py | 21 ++++++++++++---------
api/utils/api_utils.py | 5 +++--
5 files changed, 34 insertions(+), 13 deletions(-)
diff --git a/agent/canvas.py b/agent/canvas.py
index 7a1d3bd234..776e5764b1 100644
--- a/agent/canvas.py
+++ b/agent/canvas.py
@@ -137,6 +137,20 @@ class Graph:
except Exception as e:
logging.exception(e)
+ def close(self):
+ from common.mcp_tool_call_conn import MCPToolCallSession
+ seen = set()
+ for cpn in self.components.values():
+ obj = cpn.get("obj")
+ if obj and hasattr(obj, "tools"):
+ for tool in obj.tools.values():
+ if isinstance(tool, MCPToolCallSession) and id(tool) not in seen:
+ seen.add(id(tool))
+ try:
+ tool.close_sync(timeout=3)
+ except Exception:
+ pass
+
def get_component_name(self, cid):
for n in self.dsl.get("graph", {}).get("nodes", []):
if cid == n["id"]:
diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py
index 7fa55d16de..39b18a9f09 100644
--- a/api/apps/canvas_app.py
+++ b/api/apps/canvas_app.py
@@ -226,6 +226,8 @@ async def run():
logging.exception(e)
canvas.cancel_task()
yield "data:" + json.dumps({"code": 500, "message": str(e), "data": False}, ensure_ascii=False) + "\n\n"
+ finally:
+ canvas.close()
resp = Response(sse(), mimetype="text/event-stream")
resp.headers.add_header("Cache-control", "no-cache")
diff --git a/api/apps/mcp_server_app.py b/api/apps/mcp_server_app.py
index 187560d626..f292b9c758 100644
--- a/api/apps/mcp_server_app.py
+++ b/api/apps/mcp_server_app.py
@@ -367,11 +367,12 @@ async def test_tool() -> Response:
tool_call_sessions.append(tool_call_session)
result = await thread_pool_exec(tool_call_session.tool_call, tool_name, arguments, timeout)
- # PERF: blocking call to close sessions — consider moving to background thread or task queue
- await thread_pool_exec(close_multiple_mcp_toolcall_sessions, tool_call_sessions)
return get_json_result(data=result)
except Exception as e:
return server_error_response(e)
+ finally:
+ # PERF: blocking call to close sessions — consider moving to background thread or task queue
+ await thread_pool_exec(close_multiple_mcp_toolcall_sessions, tool_call_sessions)
@manager.route("/cache_tools", methods=["POST"]) # noqa: F821
diff --git a/api/db/services/canvas_service.py b/api/db/services/canvas_service.py
index 99cb199004..ed75ef5571 100644
--- a/api/db/services/canvas_service.py
+++ b/api/db/services/canvas_service.py
@@ -233,15 +233,18 @@ async def completion(tenant_id, agent_id, session_id=None, **kwargs):
"files": files
})
txt = ""
- async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs):
- ans["session_id"] = session_id
- if ans["event"] == "message":
- txt += ans["data"]["content"]
- if ans["data"].get("start_to_think", False):
- txt += ""
- elif ans["data"].get("end_to_think", False):
- txt += ""
- yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
+ try:
+ async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs):
+ ans["session_id"] = session_id
+ if ans["event"] == "message":
+ txt += ans["data"]["content"]
+ if ans["data"].get("start_to_think", False):
+ txt += ""
+ elif ans["data"].get("end_to_think", False):
+ txt += ""
+ yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
+ finally:
+ canvas.close()
conv.message.append({"role": "assistant", "content": txt, "created_at": time.time(), "id": message_id})
conv.reference = canvas.get_reference()
diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py
index 9849a5c0eb..79cb4fd2c1 100644
--- a/api/utils/api_utils.py
+++ b/api/utils/api_utils.py
@@ -685,11 +685,12 @@ def get_mcp_tools(mcp_servers: list, timeout: float | int = 10) -> tuple[dict, s
tool_dict["enabled"] = cached_tool.get("enabled", True)
results[server_key].append(tool_dict)
- # PERF: blocking call to close sessions — consider moving to background thread or task queue
- close_multiple_mcp_toolcall_sessions(tool_call_sessions)
return results, ""
except Exception as e:
return {}, str(e)
+ finally:
+ # PERF: blocking call to close sessions — consider moving to background thread or task queue
+ close_multiple_mcp_toolcall_sessions(tool_call_sessions)
async def is_strong_enough(chat_model, embedding_model):