Fix: close MCP sessions after canvas execution to prevent connection leaks

Add Graph.close() method that finds and closes all MCPToolCallSessions
held by Agent components. Call it in finally blocks after canvas.run()
in canvas_service.py and canvas_app.py. Also move session cleanup to
finally blocks in test_tool endpoint and get_mcp_tools to ensure
sessions are closed even on exceptions.
This commit is contained in:
Br1an67
2026-03-02 01:04:04 +08:00
parent 7e0dd906f2
commit 8de262e5c4
5 changed files with 34 additions and 13 deletions

View File

@@ -137,6 +137,20 @@ class Graph:
except Exception as e:
logging.exception(e)
def close(self):
from common.mcp_tool_call_conn import MCPToolCallSession
seen = set()
for cpn in self.components.values():
obj = cpn.get("obj")
if obj and hasattr(obj, "tools"):
for tool in obj.tools.values():
if isinstance(tool, MCPToolCallSession) and id(tool) not in seen:
seen.add(id(tool))
try:
tool.close_sync(timeout=3)
except Exception:
pass
def get_component_name(self, cid):
for n in self.dsl.get("graph", {}).get("nodes", []):
if cid == n["id"]:

View File

@@ -226,6 +226,8 @@ async def run():
logging.exception(e)
canvas.cancel_task()
yield "data:" + json.dumps({"code": 500, "message": str(e), "data": False}, ensure_ascii=False) + "\n\n"
finally:
canvas.close()
resp = Response(sse(), mimetype="text/event-stream")
resp.headers.add_header("Cache-control", "no-cache")

View File

@@ -367,11 +367,12 @@ async def test_tool() -> Response:
tool_call_sessions.append(tool_call_session)
result = await thread_pool_exec(tool_call_session.tool_call, tool_name, arguments, timeout)
# PERF: blocking call to close sessions — consider moving to background thread or task queue
await thread_pool_exec(close_multiple_mcp_toolcall_sessions, tool_call_sessions)
return get_json_result(data=result)
except Exception as e:
return server_error_response(e)
finally:
# PERF: blocking call to close sessions — consider moving to background thread or task queue
await thread_pool_exec(close_multiple_mcp_toolcall_sessions, tool_call_sessions)
@manager.route("/cache_tools", methods=["POST"]) # noqa: F821

View File

@@ -233,15 +233,18 @@ async def completion(tenant_id, agent_id, session_id=None, **kwargs):
"files": files
})
txt = ""
async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs):
ans["session_id"] = session_id
if ans["event"] == "message":
txt += ans["data"]["content"]
if ans["data"].get("start_to_think", False):
txt += "<think>"
elif ans["data"].get("end_to_think", False):
txt += "</think>"
yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
try:
async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs):
ans["session_id"] = session_id
if ans["event"] == "message":
txt += ans["data"]["content"]
if ans["data"].get("start_to_think", False):
txt += "<think>"
elif ans["data"].get("end_to_think", False):
txt += "</think>"
yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
finally:
canvas.close()
conv.message.append({"role": "assistant", "content": txt, "created_at": time.time(), "id": message_id})
conv.reference = canvas.get_reference()

View File

@@ -685,11 +685,12 @@ def get_mcp_tools(mcp_servers: list, timeout: float | int = 10) -> tuple[dict, s
tool_dict["enabled"] = cached_tool.get("enabled", True)
results[server_key].append(tool_dict)
# PERF: blocking call to close sessions — consider moving to background thread or task queue
close_multiple_mcp_toolcall_sessions(tool_call_sessions)
return results, ""
except Exception as e:
return {}, str(e)
finally:
# PERF: blocking call to close sessions — consider moving to background thread or task queue
close_multiple_mcp_toolcall_sessions(tool_call_sessions)
async def is_strong_enough(chat_model, embedding_model):