diff --git a/agent/canvas.py b/agent/canvas.py
index 7a1d3bd234..776e5764b1 100644
--- a/agent/canvas.py
+++ b/agent/canvas.py
@@ -137,6 +137,20 @@ class Graph:
except Exception as e:
logging.exception(e)
+ def close(self):
+ from common.mcp_tool_call_conn import MCPToolCallSession
+ seen = set()
+ for cpn in self.components.values():
+ obj = cpn.get("obj")
+ if obj and hasattr(obj, "tools"):
+ for tool in obj.tools.values():
+ if isinstance(tool, MCPToolCallSession) and id(tool) not in seen:
+ seen.add(id(tool))
+ try:
+ tool.close_sync(timeout=3)
+ except Exception:
+ pass
+
def get_component_name(self, cid):
for n in self.dsl.get("graph", {}).get("nodes", []):
if cid == n["id"]:
diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py
index 7fa55d16de..39b18a9f09 100644
--- a/api/apps/canvas_app.py
+++ b/api/apps/canvas_app.py
@@ -226,6 +226,8 @@ async def run():
logging.exception(e)
canvas.cancel_task()
yield "data:" + json.dumps({"code": 500, "message": str(e), "data": False}, ensure_ascii=False) + "\n\n"
+ finally:
+ canvas.close()
resp = Response(sse(), mimetype="text/event-stream")
resp.headers.add_header("Cache-control", "no-cache")
diff --git a/api/apps/mcp_server_app.py b/api/apps/mcp_server_app.py
index 187560d626..f292b9c758 100644
--- a/api/apps/mcp_server_app.py
+++ b/api/apps/mcp_server_app.py
@@ -367,11 +367,12 @@ async def test_tool() -> Response:
tool_call_sessions.append(tool_call_session)
result = await thread_pool_exec(tool_call_session.tool_call, tool_name, arguments, timeout)
- # PERF: blocking call to close sessions — consider moving to background thread or task queue
- await thread_pool_exec(close_multiple_mcp_toolcall_sessions, tool_call_sessions)
return get_json_result(data=result)
except Exception as e:
return server_error_response(e)
+ finally:
+ # PERF: blocking call to close sessions — consider moving to background thread or task queue
+ await thread_pool_exec(close_multiple_mcp_toolcall_sessions, tool_call_sessions)
@manager.route("/cache_tools", methods=["POST"]) # noqa: F821
diff --git a/api/db/services/canvas_service.py b/api/db/services/canvas_service.py
index 99cb199004..ed75ef5571 100644
--- a/api/db/services/canvas_service.py
+++ b/api/db/services/canvas_service.py
@@ -233,15 +233,18 @@ async def completion(tenant_id, agent_id, session_id=None, **kwargs):
"files": files
})
txt = ""
- async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs):
- ans["session_id"] = session_id
- if ans["event"] == "message":
- txt += ans["data"]["content"]
- if ans["data"].get("start_to_think", False):
- txt += ""
- elif ans["data"].get("end_to_think", False):
- txt += ""
- yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
+ try:
+ async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs):
+ ans["session_id"] = session_id
+ if ans["event"] == "message":
+ txt += ans["data"]["content"]
+ if ans["data"].get("start_to_think", False):
+ txt += ""
+ elif ans["data"].get("end_to_think", False):
+ txt += ""
+ yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
+ finally:
+ canvas.close()
conv.message.append({"role": "assistant", "content": txt, "created_at": time.time(), "id": message_id})
conv.reference = canvas.get_reference()
diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py
index 9849a5c0eb..79cb4fd2c1 100644
--- a/api/utils/api_utils.py
+++ b/api/utils/api_utils.py
@@ -685,11 +685,12 @@ def get_mcp_tools(mcp_servers: list, timeout: float | int = 10) -> tuple[dict, s
tool_dict["enabled"] = cached_tool.get("enabled", True)
results[server_key].append(tool_dict)
- # PERF: blocking call to close sessions — consider moving to background thread or task queue
- close_multiple_mcp_toolcall_sessions(tool_call_sessions)
return results, ""
except Exception as e:
return {}, str(e)
+ finally:
+ # PERF: blocking call to close sessions — consider moving to background thread or task queue
+ close_multiple_mcp_toolcall_sessions(tool_call_sessions)
async def is_strong_enough(chat_model, embedding_model):