diff --git a/agent/canvas.py b/agent/canvas.py index 7a1d3bd234..776e5764b1 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -137,6 +137,20 @@ class Graph: except Exception as e: logging.exception(e) + def close(self): + from common.mcp_tool_call_conn import MCPToolCallSession + seen = set() + for cpn in self.components.values(): + obj = cpn.get("obj") + if obj and hasattr(obj, "tools"): + for tool in obj.tools.values(): + if isinstance(tool, MCPToolCallSession) and id(tool) not in seen: + seen.add(id(tool)) + try: + tool.close_sync(timeout=3) + except Exception: + pass + def get_component_name(self, cid): for n in self.dsl.get("graph", {}).get("nodes", []): if cid == n["id"]: diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index 7fa55d16de..39b18a9f09 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -226,6 +226,8 @@ async def run(): logging.exception(e) canvas.cancel_task() yield "data:" + json.dumps({"code": 500, "message": str(e), "data": False}, ensure_ascii=False) + "\n\n" + finally: + canvas.close() resp = Response(sse(), mimetype="text/event-stream") resp.headers.add_header("Cache-control", "no-cache") diff --git a/api/apps/mcp_server_app.py b/api/apps/mcp_server_app.py index 187560d626..f292b9c758 100644 --- a/api/apps/mcp_server_app.py +++ b/api/apps/mcp_server_app.py @@ -367,11 +367,12 @@ async def test_tool() -> Response: tool_call_sessions.append(tool_call_session) result = await thread_pool_exec(tool_call_session.tool_call, tool_name, arguments, timeout) - # PERF: blocking call to close sessions — consider moving to background thread or task queue - await thread_pool_exec(close_multiple_mcp_toolcall_sessions, tool_call_sessions) return get_json_result(data=result) except Exception as e: return server_error_response(e) + finally: + # PERF: blocking call to close sessions — consider moving to background thread or task queue + await thread_pool_exec(close_multiple_mcp_toolcall_sessions, tool_call_sessions) @manager.route("/cache_tools", methods=["POST"]) # noqa: F821 diff --git a/api/db/services/canvas_service.py b/api/db/services/canvas_service.py index 99cb199004..ed75ef5571 100644 --- a/api/db/services/canvas_service.py +++ b/api/db/services/canvas_service.py @@ -233,15 +233,18 @@ async def completion(tenant_id, agent_id, session_id=None, **kwargs): "files": files }) txt = "" - async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs): - ans["session_id"] = session_id - if ans["event"] == "message": - txt += ans["data"]["content"] - if ans["data"].get("start_to_think", False): - txt += "" - elif ans["data"].get("end_to_think", False): - txt += "" - yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" + try: + async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs): + ans["session_id"] = session_id + if ans["event"] == "message": + txt += ans["data"]["content"] + if ans["data"].get("start_to_think", False): + txt += "" + elif ans["data"].get("end_to_think", False): + txt += "" + yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" + finally: + canvas.close() conv.message.append({"role": "assistant", "content": txt, "created_at": time.time(), "id": message_id}) conv.reference = canvas.get_reference() diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index 9849a5c0eb..79cb4fd2c1 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -685,11 +685,12 @@ def get_mcp_tools(mcp_servers: list, timeout: float | int = 10) -> tuple[dict, s tool_dict["enabled"] = cached_tool.get("enabled", True) results[server_key].append(tool_dict) - # PERF: blocking call to close sessions — consider moving to background thread or task queue - close_multiple_mcp_toolcall_sessions(tool_call_sessions) return results, "" except Exception as e: return {}, str(e) + finally: + # PERF: blocking call to close sessions — consider moving to background thread or task queue + close_multiple_mcp_toolcall_sessions(tool_call_sessions) async def is_strong_enough(chat_model, embedding_model):