From c5a46fda448c943aef48f9e477330af02f89a0a8 Mon Sep 17 00:00:00 2001 From: Wang Qi Date: Thu, 21 May 2026 19:23:41 +0800 Subject: [PATCH] Fix: is bound to a different event loop (#15100) Fix: is bound to a different event loop --- common/asyncio_utils.py | 56 ++++++++++++++++++++++++++++++++++++++++ rag/graphrag/utils.py | 3 ++- rag/svr/task_executor.py | 11 ++++---- 3 files changed, 64 insertions(+), 6 deletions(-) create mode 100644 common/asyncio_utils.py diff --git a/common/asyncio_utils.py b/common/asyncio_utils.py new file mode 100644 index 0000000000..12f5e0220a --- /dev/null +++ b/common/asyncio_utils.py @@ -0,0 +1,56 @@ +# +# Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import asyncio +import weakref + + +class LoopLocalSemaphore: + """ + Asyncio synchronization primitives bind to the event loop that waits on them. + Keep one semaphore per running loop for module-level concurrency limiters. + """ + + def __init__(self, value: int): + self._value = int(value) + self._semaphores: "weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, asyncio.Semaphore]" = ( + weakref.WeakKeyDictionary() + ) + + def _get(self) -> asyncio.Semaphore: + loop = asyncio.get_running_loop() + for cached_loop in list(self._semaphores): + if cached_loop.is_closed(): + self._semaphores.pop(cached_loop, None) + sem = self._semaphores.get(loop) + if sem is None: + sem = asyncio.Semaphore(self._value) + self._semaphores[loop] = sem + return sem + + async def acquire(self) -> bool: + return await self._get().acquire() + + def release(self) -> None: + self._get().release() + + async def __aenter__(self): + await self.acquire() + return self + + async def __aexit__(self, exc_type, exc, tb): + self.release() + return False diff --git a/rag/graphrag/utils.py b/rag/graphrag/utils.py index fa29ebe389..c92fe0c6fe 100644 --- a/rag/graphrag/utils.py +++ b/rag/graphrag/utils.py @@ -28,6 +28,7 @@ from networkx.readwrite import json_graph from common.misc_utils import get_uuid from common.connection_utils import timeout +from common.asyncio_utils import LoopLocalSemaphore from rag.nlp import rag_tokenizer, search from rag.utils.redis_conn import REDIS_CONN from common import settings @@ -37,7 +38,7 @@ GRAPH_FIELD_SEP = "" ErrorHandlerFn = Callable[[BaseException | None, str | None, dict | None], None] -chat_limiter = asyncio.Semaphore(int(os.environ.get("MAX_CONCURRENT_CHATS", 10))) +chat_limiter = LoopLocalSemaphore(int(os.environ.get("MAX_CONCURRENT_CHATS", 10))) # Doc-store insert batching for GraphRAG subgraph/node/edge/community_report # chunks. Defaults (64 docs per batch, up to 4 batches in flight) mirror the diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index e639ba6e46..9e263e8b5a 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -89,6 +89,7 @@ from rag.utils.redis_conn import REDIS_CONN, RedisDistributedLock from rag.graphrag.utils import chat_limiter from common.signal_utils import start_tracemalloc_and_snapshot, stop_tracemalloc from common.exceptions import TaskCanceledException +from common.asyncio_utils import LoopLocalSemaphore from common import settings from common.constants import PAGERANK_FLD, TAG_FLD, SVR_CONSUMER_GROUP_NAME from rag.utils.table_es_metadata import ( @@ -142,11 +143,11 @@ CURRENT_TASKS = {} MAX_CONCURRENT_TASKS = int(os.environ.get('MAX_CONCURRENT_TASKS', "5")) MAX_CONCURRENT_CHUNK_BUILDERS = int(os.environ.get('MAX_CONCURRENT_CHUNK_BUILDERS', "1")) MAX_CONCURRENT_MINIO = int(os.environ.get('MAX_CONCURRENT_MINIO', '10')) -task_limiter = asyncio.Semaphore(MAX_CONCURRENT_TASKS) -chunk_limiter = asyncio.Semaphore(MAX_CONCURRENT_CHUNK_BUILDERS) -embed_limiter = asyncio.Semaphore(MAX_CONCURRENT_CHUNK_BUILDERS) -minio_limiter = asyncio.Semaphore(MAX_CONCURRENT_MINIO) -kg_limiter = asyncio.Semaphore(2) +task_limiter = LoopLocalSemaphore(MAX_CONCURRENT_TASKS) +chunk_limiter = LoopLocalSemaphore(MAX_CONCURRENT_CHUNK_BUILDERS) +embed_limiter = LoopLocalSemaphore(MAX_CONCURRENT_CHUNK_BUILDERS) +minio_limiter = LoopLocalSemaphore(MAX_CONCURRENT_MINIO) +kg_limiter = LoopLocalSemaphore(2) WORKER_HEARTBEAT_TIMEOUT = int(os.environ.get('WORKER_HEARTBEAT_TIMEOUT', '120')) stop_event = threading.Event()