mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-01 16:25:44 +08:00
Fix: close MCP sessions after canvas execution to prevent connection leaks
Add Graph.close() method that finds and closes all MCPToolCallSessions held by Agent components. Call it in finally blocks after canvas.run() in canvas_service.py and canvas_app.py. Also move session cleanup to finally blocks in test_tool endpoint and get_mcp_tools to ensure sessions are closed even on exceptions.
This commit is contained in:
@@ -137,6 +137,20 @@ class Graph:
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
|
||||
def close(self):
|
||||
from common.mcp_tool_call_conn import MCPToolCallSession
|
||||
seen = set()
|
||||
for cpn in self.components.values():
|
||||
obj = cpn.get("obj")
|
||||
if obj and hasattr(obj, "tools"):
|
||||
for tool in obj.tools.values():
|
||||
if isinstance(tool, MCPToolCallSession) and id(tool) not in seen:
|
||||
seen.add(id(tool))
|
||||
try:
|
||||
tool.close_sync(timeout=3)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def get_component_name(self, cid):
|
||||
for n in self.dsl.get("graph", {}).get("nodes", []):
|
||||
if cid == n["id"]:
|
||||
|
||||
@@ -226,6 +226,8 @@ async def run():
|
||||
logging.exception(e)
|
||||
canvas.cancel_task()
|
||||
yield "data:" + json.dumps({"code": 500, "message": str(e), "data": False}, ensure_ascii=False) + "\n\n"
|
||||
finally:
|
||||
canvas.close()
|
||||
|
||||
resp = Response(sse(), mimetype="text/event-stream")
|
||||
resp.headers.add_header("Cache-control", "no-cache")
|
||||
|
||||
@@ -367,11 +367,12 @@ async def test_tool() -> Response:
|
||||
tool_call_sessions.append(tool_call_session)
|
||||
result = await thread_pool_exec(tool_call_session.tool_call, tool_name, arguments, timeout)
|
||||
|
||||
# PERF: blocking call to close sessions — consider moving to background thread or task queue
|
||||
await thread_pool_exec(close_multiple_mcp_toolcall_sessions, tool_call_sessions)
|
||||
return get_json_result(data=result)
|
||||
except Exception as e:
|
||||
return server_error_response(e)
|
||||
finally:
|
||||
# PERF: blocking call to close sessions — consider moving to background thread or task queue
|
||||
await thread_pool_exec(close_multiple_mcp_toolcall_sessions, tool_call_sessions)
|
||||
|
||||
|
||||
@manager.route("/cache_tools", methods=["POST"]) # noqa: F821
|
||||
|
||||
@@ -233,15 +233,18 @@ async def completion(tenant_id, agent_id, session_id=None, **kwargs):
|
||||
"files": files
|
||||
})
|
||||
txt = ""
|
||||
async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs):
|
||||
ans["session_id"] = session_id
|
||||
if ans["event"] == "message":
|
||||
txt += ans["data"]["content"]
|
||||
if ans["data"].get("start_to_think", False):
|
||||
txt += "<think>"
|
||||
elif ans["data"].get("end_to_think", False):
|
||||
txt += "</think>"
|
||||
yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
|
||||
try:
|
||||
async for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs):
|
||||
ans["session_id"] = session_id
|
||||
if ans["event"] == "message":
|
||||
txt += ans["data"]["content"]
|
||||
if ans["data"].get("start_to_think", False):
|
||||
txt += "<think>"
|
||||
elif ans["data"].get("end_to_think", False):
|
||||
txt += "</think>"
|
||||
yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
|
||||
finally:
|
||||
canvas.close()
|
||||
|
||||
conv.message.append({"role": "assistant", "content": txt, "created_at": time.time(), "id": message_id})
|
||||
conv.reference = canvas.get_reference()
|
||||
|
||||
@@ -685,11 +685,12 @@ def get_mcp_tools(mcp_servers: list, timeout: float | int = 10) -> tuple[dict, s
|
||||
tool_dict["enabled"] = cached_tool.get("enabled", True)
|
||||
results[server_key].append(tool_dict)
|
||||
|
||||
# PERF: blocking call to close sessions — consider moving to background thread or task queue
|
||||
close_multiple_mcp_toolcall_sessions(tool_call_sessions)
|
||||
return results, ""
|
||||
except Exception as e:
|
||||
return {}, str(e)
|
||||
finally:
|
||||
# PERF: blocking call to close sessions — consider moving to background thread or task queue
|
||||
close_multiple_mcp_toolcall_sessions(tool_call_sessions)
|
||||
|
||||
|
||||
async def is_strong_enough(chat_model, embedding_model):
|
||||
|
||||
Reference in New Issue
Block a user