From a9ec78cb9cdf82c0a0dd179d1a9ad591a4d5b4db Mon Sep 17 00:00:00 2001 From: Wang Qi Date: Fri, 22 May 2026 13:16:39 +0800 Subject: [PATCH] Refactor: enahnce retry and timeout (#14983) ### What problem does this PR solve? 1. Enhance retry and timeout, and adjust the default timeout 2. NER: spacy do not batch chunks 3. extract _has_cancel_and_exit 4. enhance log messages ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Refactoring --- api/utils/api_utils.py | 9 + api/utils/validation_utils.py | 9 + rag/graphrag/general/graph_extractor.py | 2 +- rag/graphrag/general/index.py | 399 ++++++++++++++++++------ rag/graphrag/light/graph_extractor.py | 2 +- rag/graphrag/ner/graph_extractor.py | 2 +- rag/svr/task_executor.py | 9 + test/testcases/configs.py | 9 + 8 files changed, 337 insertions(+), 104 deletions(-) diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index 5e034f0c50..8c6abca7d5 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -440,6 +440,15 @@ def get_parser_config(chunk_method, parser_config): ], "method": "light", "batch_chunk_token_size": 4096, + "retry_attempts": 2, + "retry_backoff_seconds": 2.0, + "retry_backoff_max_seconds": 60.0, + "build_subgraph_timeout_per_chunk_seconds": 300, + "build_subgraph_min_timeout_seconds": 600, + "merge_timeout_seconds": 180, + "resolution_timeout_seconds": 1800, + "community_timeout_seconds": 1800, + "lock_acquire_timeout_seconds": 600, }, "parent_child": { "use_parent_child": False, diff --git a/api/utils/validation_utils.py b/api/utils/validation_utils.py index 861f94ee22..b499af8fe4 100644 --- a/api/utils/validation_utils.py +++ b/api/utils/validation_utils.py @@ -363,6 +363,15 @@ class GraphragConfig(Base): community: Annotated[bool, Field(default=False)] resolution: Annotated[bool, Field(default=False)] batch_chunk_token_size: Annotated[int, Field(default=4096, ge=512, le=8196)] + retry_attempts: Annotated[int, Field(default=2, ge=1, le=10)] + retry_backoff_seconds: Annotated[float, Field(default=2.0, ge=0.0, le=600.0)] + retry_backoff_max_seconds: Annotated[float, Field(default=60.0, ge=0.0, le=3600.0)] + build_subgraph_timeout_per_chunk_seconds: Annotated[int, Field(default=300, ge=1, le=86400)] + build_subgraph_min_timeout_seconds: Annotated[int, Field(default=600, ge=1, le=86400)] + merge_timeout_seconds: Annotated[int, Field(default=180, ge=0, le=86400)] + resolution_timeout_seconds: Annotated[int, Field(default=1800, ge=0, le=86400)] + community_timeout_seconds: Annotated[int, Field(default=1800, ge=0, le=86400)] + lock_acquire_timeout_seconds: Annotated[int, Field(default=600, ge=0, le=86400)] class ParentChildConfig(Base): diff --git a/rag/graphrag/general/graph_extractor.py b/rag/graphrag/general/graph_extractor.py index 95dc87ef2d..5f06c36879 100644 --- a/rag/graphrag/general/graph_extractor.py +++ b/rag/graphrag/general/graph_extractor.py @@ -147,4 +147,4 @@ class GraphExtractor(Extractor): maybe_nodes, maybe_edges = self._entities_and_relations(chunk_key, records, self._prompt_variables[self._tuple_delimiter_key]) out_results.append((maybe_nodes, maybe_edges, token_count)) if self.callback: - self.callback(0.5+0.1*len(out_results)/num_chunks, msg = f"Entities extraction of chunk {chunk_seq} {len(out_results)}/{num_chunks} done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {token_count} tokens.") + self.callback(0.5+0.1*len(out_results)/num_chunks, msg = f"Entities extraction of chunk {chunk_seq+1} {len(out_results)}/{num_chunks} done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {token_count} tokens.") diff --git a/rag/graphrag/general/index.py b/rag/graphrag/general/index.py index 396f3aae0b..98e7ab8fd3 100644 --- a/rag/graphrag/general/index.py +++ b/rag/graphrag/general/index.py @@ -16,7 +16,6 @@ import asyncio import json import logging -import os import networkx as nx @@ -55,21 +54,64 @@ from common.doc_store.doc_store_base import OrderByExpr DEFAULT_GRAPHRAG_BATCH_CHUNK_TOKEN_SIZE = 4096 +MIN_GRAPHRAG_BATCH_CHUNK_TOKEN_SIZE = 512 +MAX_GRAPHRAG_BATCH_CHUNK_TOKEN_SIZE = 8196 +DEFAULT_GRAPHRAG_RETRY_ATTEMPTS = 2 +DEFAULT_GRAPHRAG_RETRY_BACKOFF_SECONDS = 2.0 +DEFAULT_GRAPHRAG_RETRY_BACKOFF_MAX_SECONDS = 60.0 +DEFAULT_GRAPHRAG_BUILD_SUBGRAPH_TIMEOUT_PER_CHUNK_SECONDS = 300 +DEFAULT_GRAPHRAG_BUILD_SUBGRAPH_MIN_TIMEOUT_SECONDS = 600 +DEFAULT_GRAPHRAG_MERGE_TIMEOUT_SECONDS = 180 +DEFAULT_GRAPHRAG_RESOLUTION_TIMEOUT_SECONDS = 1800 +DEFAULT_GRAPHRAG_COMMUNITY_TIMEOUT_SECONDS = 1800 +DEFAULT_GRAPHRAG_LOCK_ACQUIRE_TIMEOUT_SECONDS = 600 -def _positive_int_config(config: dict, key: str, default: int) -> int: +def _bounded_int_config(config: dict, key: str, default: int, minimum: int, maximum: int) -> int: value = config.get(key, default) + if value is None: + return default try: value = int(value) except (TypeError, ValueError): logging.warning("Invalid GraphRAG config %s=%r, using default %s", key, value, default) return default - if value < 512 or value > 8196: + if value < minimum or value > maximum: logging.warning("Invalid GraphRAG config %s=%r, using default %s", key, value, default) return default return value +def _bounded_float_config(config: dict, key: str, default: float, minimum: float, maximum: float) -> float: + value = config.get(key, default) + if value is None: + return default + try: + value = float(value) + except (TypeError, ValueError): + logging.warning("Invalid GraphRAG config %s=%r, using default %s", key, value, default) + return default + if value < minimum or value > maximum: + logging.warning("Invalid GraphRAG config %s=%r, using default %s", key, value, default) + return default + return value + + +def _batch_chunk_token_size_config(config: dict, key: str, default: int) -> int: + return _bounded_int_config(config, key, default, MIN_GRAPHRAG_BATCH_CHUNK_TOKEN_SIZE, MAX_GRAPHRAG_BATCH_CHUNK_TOKEN_SIZE) + + +def _lock_acquire_timeout_config(config: dict) -> int: + value = _bounded_int_config(config, "lock_acquire_timeout_seconds", DEFAULT_GRAPHRAG_LOCK_ACQUIRE_TIMEOUT_SECONDS, 0, 86400) + if value == 0: + return DEFAULT_GRAPHRAG_LOCK_ACQUIRE_TIMEOUT_SECONDS + return value + + +def _select_extractor_type(graphrag_config: dict): + return graphrag_config.get("method", "light") + + def _select_extractor(graphrag_config: dict): """Return the extractor class matching ``graphrag_config["method"]``. @@ -89,6 +131,74 @@ def _select_extractor(graphrag_config: dict): return LightKGExt +def _has_cancel_and_exit(task_id: str, message: str, callback=None) -> None: + if not task_id or not has_canceled(task_id): + return + if callback: + callback(msg=message) + raise TaskCanceledException(f"Task {task_id} was cancelled") + + +async def _run_with_retry( + label: str, + coro_factory, + *, + attempts: int, + timeout_seconds: int | float, + backoff_seconds: float, + backoff_max_seconds: float, + callback=None, + task_id: str = "", +): + attempts = max(1, attempts) + last_error = None + for attempt in range(1, attempts + 1): + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before {label}.", callback) + try: + if timeout_seconds and timeout_seconds > 0: + return await asyncio.wait_for(coro_factory(), timeout=timeout_seconds) + return await coro_factory() + except (TaskCanceledException, asyncio.CancelledError): + raise + except asyncio.TimeoutError as e: + last_error = e + error_msg = f"timeout after {timeout_seconds}s" + except Exception as e: + last_error = e + error_msg = repr(e) + + if attempt >= attempts: + if callback: + callback(msg=f"[GraphRAG] {label} FAILED after {attempt}/{attempts} attempts: {error_msg}") + raise last_error + + wait = min(backoff_max_seconds, backoff_seconds * (2 ** (attempt - 1))) + if callback: + callback(msg=f"[GraphRAG] {label} failed attempt {attempt}/{attempts}: {error_msg}; retrying in {wait:.1f}s") + logging.warning("GraphRAG %s failed attempt %s/%s: %s", label, attempt, attempts, error_msg) + if wait > 0: + await asyncio.sleep(wait) + + +async def _acquire_lock(lock: RedisDistributedLock, label: str, timeout_seconds: int, callback, task_id: str): + if timeout_seconds <= 0: + timeout_seconds = DEFAULT_GRAPHRAG_LOCK_ACQUIRE_TIMEOUT_SECONDS + deadline = asyncio.get_running_loop().time() + timeout_seconds + while True: + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before acquiring {label}.", callback) + if lock.acquire(): + return + + remaining_seconds = deadline - asyncio.get_running_loop().time() + if remaining_seconds <= 0: + msg = f"[GraphRAG] failed to acquire {label} after {timeout_seconds}s" + if callback: + callback(msg=msg) + raise asyncio.TimeoutError(msg) + + await asyncio.sleep(min(10, remaining_seconds)) + + async def load_subgraph_from_store(tenant_id: str, kb_id: str, doc_id: str): """Load a previously saved subgraph from the doc store. @@ -151,11 +261,36 @@ async def run_graphrag_for_kb( max_parallel_docs: int = 4, ) -> dict: tenant_id, kb_id = row["tenant_id"], row["kb_id"] - enable_timeout_assertion = os.environ.get("ENABLE_TIMEOUT_ASSERTION") + task_id = row["id"] start = asyncio.get_running_loop().time() fields_for_chunks = ["content_with_weight", "doc_id"] graphrag_config = kb_parser_config.get("graphrag", {}) - batch_chunk_token_size = _positive_int_config(graphrag_config, "batch_chunk_token_size", DEFAULT_GRAPHRAG_BATCH_CHUNK_TOKEN_SIZE) + batch_chunk_token_size = _batch_chunk_token_size_config(graphrag_config, "batch_chunk_token_size", DEFAULT_GRAPHRAG_BATCH_CHUNK_TOKEN_SIZE) + retry_attempts = _bounded_int_config(graphrag_config, "retry_attempts", DEFAULT_GRAPHRAG_RETRY_ATTEMPTS, 1, 10) + retry_backoff_seconds = _bounded_float_config(graphrag_config, "retry_backoff_seconds", DEFAULT_GRAPHRAG_RETRY_BACKOFF_SECONDS, 0.0, 600.0) + retry_backoff_max_seconds = _bounded_float_config(graphrag_config, "retry_backoff_max_seconds", DEFAULT_GRAPHRAG_RETRY_BACKOFF_MAX_SECONDS, 0.0, 3600.0) + build_subgraph_retry_attempts = _bounded_int_config(graphrag_config, "build_subgraph_retry_attempts", retry_attempts, 1, 10) + merge_retry_attempts = _bounded_int_config(graphrag_config, "merge_retry_attempts", retry_attempts, 1, 10) + resolution_retry_attempts = _bounded_int_config(graphrag_config, "resolution_retry_attempts", retry_attempts, 1, 10) + community_retry_attempts = _bounded_int_config(graphrag_config, "community_retry_attempts", retry_attempts, 1, 10) + build_subgraph_timeout_per_chunk_seconds = _bounded_int_config( + graphrag_config, + "build_subgraph_timeout_per_chunk_seconds", + DEFAULT_GRAPHRAG_BUILD_SUBGRAPH_TIMEOUT_PER_CHUNK_SECONDS, + 1, + 86400, + ) + build_subgraph_min_timeout_seconds = _bounded_int_config( + graphrag_config, + "build_subgraph_min_timeout_seconds", + DEFAULT_GRAPHRAG_BUILD_SUBGRAPH_MIN_TIMEOUT_SECONDS, + 1, + 86400, + ) + merge_timeout_seconds = _bounded_int_config(graphrag_config, "merge_timeout_seconds", DEFAULT_GRAPHRAG_MERGE_TIMEOUT_SECONDS, 0, 86400) + resolution_timeout_seconds = _bounded_int_config(graphrag_config, "resolution_timeout_seconds", DEFAULT_GRAPHRAG_RESOLUTION_TIMEOUT_SECONDS, 0, 86400) + community_timeout_seconds = _bounded_int_config(graphrag_config, "community_timeout_seconds", DEFAULT_GRAPHRAG_COMMUNITY_TIMEOUT_SECONDS, 0, 86400) + lock_acquire_timeout_seconds = _lock_acquire_timeout_config(graphrag_config) if not doc_ids: logging.info(f"Fetching all docs for {kb_id}") @@ -174,8 +309,10 @@ async def run_graphrag_for_kb( doc_ids = list(dict.fromkeys(doc_ids)) if not doc_ids: - callback(msg=f"[GraphRAG] kb:{kb_id} has no processable doc_id.") + callback(msg=f"[GraphRAG] dataset:{kb_id} has no processable doc_id.") return {"ok_docs": [], "failed_docs": [], "total_docs": 0, "total_chunks": 0, "seconds": 0.0} + else: + callback(msg=f"[GraphRAG] dataset:{kb_id} has {len(doc_ids)} documents to process.") def load_doc_chunks(doc_id: str) -> list[str]: from common.token_utils import num_tokens_from_string @@ -194,6 +331,10 @@ async def run_graphrag_for_kb( callback(msg=f"[GraphRAG] chunk_list returned {len(raw_chunks)} raw chunks for doc:{doc_id}") + # For NER-based extractionm, no need to batch extract entity and relation + if _select_extractor_type(graphrag_config) == "ner": + return raw_chunks + for d in raw_chunks: content = d["content_with_weight"] if num_tokens_from_string(current_chunk + content) < batch_chunk_token_size: @@ -206,6 +347,7 @@ async def run_graphrag_for_kb( if current_chunk: chunks.append(current_chunk) + callback(msg=f"[GraphRAG] chunk_list combine {len(raw_chunks)} raw chunks to {len(chunks)} chunks for LLM extraction for doc:{doc_id}") return chunks total_chunks = 0 @@ -218,33 +360,42 @@ async def run_graphrag_for_kb( async def build_one(doc_id: str): nonlocal total_chunks - if has_canceled(row["id"]): - callback(msg=f"Task {row['id']} cancelled, stopping execution.") - raise TaskCanceledException(f"Task {row['id']} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled, stopping execution.", callback) kg_extractor = _select_extractor(graphrag_config) async with semaphore: # CHECKPOINT: bounded by semaphore so doc-store lookups respect max_parallel_docs + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before loading checkpoint for doc {doc_id}.", callback) existing_sg = await load_subgraph_from_store(tenant_id, kb_id, doc_id) if existing_sg: subgraphs[doc_id] = existing_sg callback(msg=f"[GraphRAG] doc:{doc_id} subgraph found in store, skipping LLM extraction.") return try: + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before loading chunks for doc {doc_id}.", callback) chunks = load_doc_chunks(doc_id) total_chunks += len(chunks) if not chunks: callback(msg=f"[GraphRAG] doc:{doc_id} has no available chunks, skip generation.") return - deadline = max(120, len(chunks) * 60 * 10) if enable_timeout_assertion else 10000000000 - msg = f"[GraphRAG] build_subgraph doc:{doc_id}" - callback(msg=f"{msg} start (chunks={len(chunks)}, timeout={deadline}s)") + build_subgraph_timeout_seconds = max( + build_subgraph_min_timeout_seconds, + len(chunks) * build_subgraph_timeout_per_chunk_seconds, + ) + label = f"build_subgraph doc:{doc_id}" + msg = f"[GraphRAG] {label}" + callback(msg=f"{msg} start (chunks={len(chunks)}, timeout={build_subgraph_timeout_seconds}s, attempts={build_subgraph_retry_attempts})") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before subgraph generation for doc {doc_id}.", callback) try: - sg = await asyncio.wait_for( - generate_subgraph( + async def build_subgraph_attempt(): + checkpoint_sg = await load_subgraph_from_store(tenant_id, kb_id, doc_id) + if checkpoint_sg: + callback(msg=f"[GraphRAG] doc:{doc_id} subgraph found in store during retry, skipping LLM extraction.") + return checkpoint_sg + return await generate_subgraph( kg_extractor, tenant_id, kb_id, @@ -255,13 +406,22 @@ async def run_graphrag_for_kb( chat_model, embedding_model, callback, - task_id=row["id"] - ), - timeout=deadline, + task_id=task_id, + ) + + sg = await _run_with_retry( + label, + build_subgraph_attempt, + attempts=build_subgraph_retry_attempts, + timeout_seconds=build_subgraph_timeout_seconds, + backoff_seconds=retry_backoff_seconds, + backoff_max_seconds=retry_backoff_max_seconds, + callback=callback, + task_id=task_id, ) except asyncio.TimeoutError: - failed_docs.append((doc_id, "timeout")) - callback(msg=f"{msg} FAILED: timeout") + failed_docs.append((doc_id, f"timeout after {build_subgraph_timeout_seconds}s")) + callback(msg=f"{msg} FAILED: timeout after {build_subgraph_timeout_seconds}s") return if sg: subgraphs[doc_id] = sg @@ -271,13 +431,12 @@ async def run_graphrag_for_kb( callback(msg=f"{msg} empty") except TaskCanceledException as canceled: callback(msg=f"[GraphRAG] build_subgraph doc:{doc_id} FAILED: {canceled}") + raise except Exception as e: failed_docs.append((doc_id, repr(e))) callback(msg=f"[GraphRAG] build_subgraph doc:{doc_id} FAILED: {e!r}") - if has_canceled(row["id"]): - callback(msg=f"Task {row['id']} cancelled before processing documents.") - raise TaskCanceledException(f"Task {row['id']} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before processing documents.", callback) tasks = [asyncio.create_task(build_one(doc_id)) for doc_id in doc_ids] try: @@ -290,12 +449,10 @@ async def run_graphrag_for_kb( raise if total_chunks == 0 and not subgraphs: - callback(msg=f"[GraphRAG] kb:{kb_id} has no available chunks in all documents, skip.") + callback(msg=f"[GraphRAG] dataset:{kb_id} has no available chunks in all documents, skip.") return {"ok_docs": [], "failed_docs": [(doc_id, "no available chunks") for doc_id in doc_ids], "total_docs": len(doc_ids), "total_chunks": 0, "seconds": 0.0} - if has_canceled(row["id"]): - callback(msg=f"Task {row['id']} cancelled after document processing.") - raise TaskCanceledException(f"Task {row['id']} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled after document processing.", callback) ok_docs = [d for d in doc_ids if d in subgraphs] final_graph = None @@ -307,47 +464,70 @@ async def run_graphrag_for_kb( community_pending = with_community and not has_phase_marker(kb_id, PHASE_COMMUNITY) if not ok_docs and not resolution_pending and not community_pending: - callback(msg=f"[GraphRAG] kb:{kb_id} no subgraphs to merge and no phases pending, end.") + callback(msg=f"[GraphRAG] dataset:{kb_id} no subgraphs to merge and no phases pending, end.") now = asyncio.get_running_loop().time() return {"ok_docs": [], "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start} - kb_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value="batch_merge", timeout=1200) - await kb_lock.spin_acquire() - callback(msg=f"[GraphRAG] kb:{kb_id} merge lock acquired") - - if has_canceled(row["id"]): - callback(msg=f"Task {row['id']} cancelled before merging subgraphs.") - raise TaskCanceledException(f"Task {row['id']} was cancelled") + kb_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=f"batch_merge:{task_id}", timeout=1200) + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before acquiring merge lock.", callback) + await _acquire_lock(kb_lock, "merge lock", lock_acquire_timeout_seconds, callback, task_id) + callback(msg=f"[GraphRAG] dataset:{kb_id} merge lock acquired") try: + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before merging subgraphs.", callback) + union_nodes: set = set() for doc_id in ok_docs: + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before merging subgraph for doc {doc_id}.", callback) sg = subgraphs[doc_id] union_nodes.update(set(sg.nodes())) - new_graph = await merge_subgraph( - tenant_id, - kb_id, - doc_id, - sg, - embedding_model, - callback, - ) + try: + async def merge_subgraph_attempt(): + current_graph = await get_graph(tenant_id, kb_id) + if current_graph and doc_id in current_graph.graph.get("source_id", []): + callback(msg=f"[GraphRAG] merge_subgraph doc:{doc_id} already merged, skipping retry.") + return current_graph + return await merge_subgraph( + tenant_id, + kb_id, + doc_id, + sg, + embedding_model, + callback, + ) + + new_graph = await _run_with_retry( + f"merge_subgraph doc:{doc_id}", + merge_subgraph_attempt, + attempts=merge_retry_attempts, + timeout_seconds=merge_timeout_seconds, + backoff_seconds=retry_backoff_seconds, + backoff_max_seconds=retry_backoff_max_seconds, + callback=callback, + task_id=task_id, + ) + except TaskCanceledException: + raise + except Exception as e: + failed_docs.append((doc_id, f"merge failed: {e!r}")) + callback(msg=f"[GraphRAG] merge_subgraph doc:{doc_id} FAILED: {e!r}") + raise if new_graph is not None: final_graph = new_graph if ok_docs and final_graph is None: - callback(msg=f"[GraphRAG] kb:{kb_id} merge finished (no in-memory graph returned).") + callback(msg=f"[GraphRAG] dataset:{kb_id} merge finished (no in-memory graph returned).") elif ok_docs: - callback(msg=f"[GraphRAG] kb:{kb_id} merge finished, graph ready.") + callback(msg=f"[GraphRAG] dataset:{kb_id} merge finished, graph ready.") # New content was merged into the global graph; any prior # resolution/community results are now stale and must be redone # on this or a future run. Clear phase markers accordingly. clear_phase_markers(kb_id) resolution_pending = with_resolution community_pending = with_community - callback(msg=f"[GraphRAG] kb:{kb_id} cleared phase markers after merge.") + callback(msg=f"[GraphRAG] dataset:{kb_id} cleared phase markers after merge.") finally: kb_lock.release() @@ -358,26 +538,27 @@ async def run_graphrag_for_kb( if not resolution_pending and not community_pending: now = asyncio.get_running_loop().time() - callback(msg=f"[GraphRAG] kb:{kb_id} all requested phases already complete; nothing to do.") + callback(msg=f"[GraphRAG] dataset:{kb_id} all requested phases already complete; nothing to do.") return {"ok_docs": ok_docs, "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start} - if has_canceled(row["id"]): - callback(msg=f"Task {row['id']} cancelled before resolution/community extraction.") - raise TaskCanceledException(f"Task {row['id']} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before resolution/community extraction.", callback) - await kb_lock.spin_acquire() - callback(msg=f"[GraphRAG] kb:{kb_id} post-merge lock acquired for resolution/community") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before acquiring post-merge lock.", callback) + await _acquire_lock(kb_lock, "post-merge lock", lock_acquire_timeout_seconds, callback, task_id) + callback(msg=f"[GraphRAG] dataset:{kb_id} post-merge lock acquired for resolution/community") try: + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before resolution/community extraction.", callback) + # Resume path: no docs were merged this round but pending phases # require the previously-persisted graph. Load it from the doc store. if final_graph is None: final_graph = await get_graph(tenant_id, kb_id) if final_graph is None: - callback(msg=f"[GraphRAG] kb:{kb_id} no persisted graph found; cannot run resolution/community.") + callback(msg=f"[GraphRAG] dataset:{kb_id} no persisted graph found; cannot run resolution/community.") now = asyncio.get_running_loop().time() return {"ok_docs": ok_docs, "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start} - callback(msg=f"[GraphRAG] kb:{kb_id} loaded persisted graph for resume.") + callback(msg=f"[GraphRAG] dataset:{kb_id} loaded persisted graph for resume.") subgraph_nodes = set() for sg in subgraphs.values(): @@ -389,35 +570,65 @@ async def run_graphrag_for_kb( subgraph_nodes = set(final_graph.nodes()) if resolution_pending: - await resolve_entities( - final_graph, - subgraph_nodes, - tenant_id, - kb_id, - None, - chat_model, - embedding_model, - callback, - task_id=row["id"], + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before entity resolution.", callback) + + async def run_resolution_attempt(): + graph_for_resolution = final_graph.copy() + await resolve_entities( + graph_for_resolution, + subgraph_nodes, + tenant_id, + kb_id, + None, + chat_model, + embedding_model, + callback, + task_id=task_id, + ) + return graph_for_resolution + + final_graph = await _run_with_retry( + "entity resolution", + run_resolution_attempt, + attempts=resolution_retry_attempts, + timeout_seconds=resolution_timeout_seconds, + backoff_seconds=retry_backoff_seconds, + backoff_max_seconds=retry_backoff_max_seconds, + callback=callback, + task_id=task_id, ) set_phase_marker(kb_id, PHASE_RESOLUTION) elif with_resolution: - callback(msg=f"[GraphRAG] kb:{kb_id} resolution already completed previously, skipping.") + callback(msg=f"[GraphRAG] dataset:{kb_id} resolution already completed previously, skipping.") if community_pending: - await extract_community( - final_graph, - tenant_id, - kb_id, - None, - chat_model, - embedding_model, - callback, - task_id=row["id"], + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before community extraction.", callback) + + async def run_community_attempt(): + await extract_community( + final_graph.copy(), + tenant_id, + kb_id, + None, + chat_model, + embedding_model, + callback, + task_id=task_id, + ) + + await _run_with_retry( + "community extraction", + run_community_attempt, + attempts=community_retry_attempts, + timeout_seconds=community_timeout_seconds, + backoff_seconds=retry_backoff_seconds, + backoff_max_seconds=retry_backoff_max_seconds, + callback=callback, + task_id=task_id, ) set_phase_marker(kb_id, PHASE_COMMUNITY) elif with_community: - callback(msg=f"[GraphRAG] kb:{kb_id} community detection already completed previously, skipping.") + callback(msg=f"[GraphRAG] dataset:{kb_id} community detection already completed previously, skipping.") finally: kb_lock.release() @@ -445,14 +656,13 @@ async def generate_subgraph( callback, task_id: str = "", ): - if task_id and has_canceled(task_id): - callback(msg=f"Task {task_id} cancelled during subgraph generation for doc {doc_id}.") - raise TaskCanceledException(f"Task {task_id} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled during subgraph generation for doc {doc_id}.", callback) contains = await does_graph_contains(tenant_id, kb_id, doc_id) if contains: callback(msg=f"Graph already contains {doc_id}") return None + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before extracting entities for doc {doc_id}.", callback) start = asyncio.get_running_loop().time() ext = extractor( llm_bdl, @@ -463,9 +673,7 @@ async def generate_subgraph( subgraph = nx.Graph() for ent in ents: - if task_id and has_canceled(task_id): - callback(msg=f"Task {task_id} cancelled during entity processing for doc {doc_id}.") - raise TaskCanceledException(f"Task {task_id} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled during entity processing for doc {doc_id}.", callback) assert "description" in ent, f"entity {ent} does not have description" ent["source_id"] = [doc_id] @@ -473,9 +681,7 @@ async def generate_subgraph( ignored_rels = 0 for rel in rels: - if task_id and has_canceled(task_id): - callback(msg=f"Task {task_id} cancelled during relationship processing for doc {doc_id}.") - raise TaskCanceledException(f"Task {task_id} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled during relationship processing for doc {doc_id}.", callback) assert "description" in rel, f"relation {rel} does not have description" if not subgraph.has_node(rel["src_id"]) or not subgraph.has_node(rel["tgt_id"]): @@ -489,6 +695,7 @@ async def generate_subgraph( ) if ignored_rels: callback(msg=f"ignored {ignored_rels} relations due to missing entities.") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before tidying subgraph for doc {doc_id}.", callback) tidy_graph(subgraph, callback, check_attribute=False) subgraph.graph["source_id"] = [doc_id] @@ -501,6 +708,7 @@ async def generate_subgraph( "removed_kwd": "N", } cid = chunk_id(chunk) + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before saving subgraph for doc {doc_id}.", callback) await thread_pool_exec(settings.docStoreConn.delete,{"knowledge_graph_kwd": "subgraph", "source_id": doc_id},search.index_name(tenant_id),kb_id,) await thread_pool_exec(settings.docStoreConn.insert,[{"id": cid, **chunk}],search.index_name(tenant_id),kb_id,) now = asyncio.get_running_loop().time() @@ -551,9 +759,7 @@ async def resolve_entities( task_id: str = "", ): # Check if task has been canceled before resolution - if task_id and has_canceled(task_id): - callback(msg=f"Task {task_id} cancelled during entity resolution.") - raise TaskCanceledException(f"Task {task_id} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled during entity resolution.", callback) start = asyncio.get_running_loop().time() er = EntityResolution( @@ -565,10 +771,9 @@ async def resolve_entities( callback(msg=f"Graph resolution removed {len(change.removed_nodes)} nodes and {len(change.removed_edges)} edges.") callback(msg="Graph resolution updated pagerank.") - if task_id and has_canceled(task_id): - callback(msg=f"Task {task_id} cancelled after entity resolution.") - raise TaskCanceledException(f"Task {task_id} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled after entity resolution.", callback) + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before saving resolved graph.", callback) await set_graph(tenant_id, kb_id, embed_bdl, graph, change, callback) now = asyncio.get_running_loop().time() callback(msg=f"Graph resolution done in {now - start:.2f}s.") @@ -585,9 +790,7 @@ async def extract_community( callback, task_id: str = "", ): - if task_id and has_canceled(task_id): - callback(msg=f"Task {task_id} cancelled before community extraction.") - raise TaskCanceledException(f"Task {task_id} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled before community extraction.", callback) start = asyncio.get_running_loop().time() ext = CommunityReportsExtractor( @@ -595,9 +798,7 @@ async def extract_community( ) cr = await ext(graph, callback=callback, task_id=task_id) - if task_id and has_canceled(task_id): - callback(msg=f"Task {task_id} cancelled during community extraction.") - raise TaskCanceledException(f"Task {task_id} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled during community extraction.", callback) community_structure = cr.structured_output community_reports = cr.output @@ -606,9 +807,7 @@ async def extract_community( now = asyncio.get_running_loop().time() callback(msg=f"Graph extracted {len(cr.structured_output)} communities in {now - start:.2f}s.") start = now - if task_id and has_canceled(task_id): - callback(msg=f"Task {task_id} cancelled during community indexing.") - raise TaskCanceledException(f"Task {task_id} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled during community indexing.", callback) chunks = [] for stru, rep in zip(community_structure, community_reports): @@ -680,9 +879,7 @@ async def extract_community( except Exception: logging.exception("Failed to prune %d stale community reports for kb %s", len(stale_ids), kb_id) - if task_id and has_canceled(task_id): - callback(msg=f"Task {task_id} cancelled after community indexing.") - raise TaskCanceledException(f"Task {task_id} was cancelled") + _has_cancel_and_exit(task_id, f"Task {task_id} cancelled after community indexing.", callback) now = asyncio.get_running_loop().time() callback(msg=f"Graph indexed {len(cr.structured_output)} communities in {now - start:.2f}s.") diff --git a/rag/graphrag/light/graph_extractor.py b/rag/graphrag/light/graph_extractor.py index d2ce83534c..f775468c8a 100644 --- a/rag/graphrag/light/graph_extractor.py +++ b/rag/graphrag/light/graph_extractor.py @@ -121,5 +121,5 @@ class GraphExtractor(Extractor): if self.callback: self.callback( 0.5 + 0.1 * len(out_results) / num_chunks, - msg=f"Entities extraction of chunk {chunk_seq} {len(out_results)}/{num_chunks} done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {token_count} tokens.", + msg=f"Entities extraction of chunk {chunk_seq+1} {len(out_results)}/{num_chunks} done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {token_count} tokens.", ) diff --git a/rag/graphrag/ner/graph_extractor.py b/rag/graphrag/ner/graph_extractor.py index 67d97346c1..7e2fc69de7 100644 --- a/rag/graphrag/ner/graph_extractor.py +++ b/rag/graphrag/ner/graph_extractor.py @@ -521,7 +521,7 @@ class GraphExtractor(Extractor): if self.callback: self.callback( 0.5 + 0.1 * len(out_results) / num_chunks, - msg=f"[spacy] Entities extraction of chunk {chunk_seq} " + msg=f"[spacy] Entities extraction of chunk {chunk_seq+1} " f"{len(out_results)}/{num_chunks} done, " f"{len(maybe_nodes)} nodes, {len(maybe_edges)} edges, " f"{token_count} tokens.", diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 3f26cd779d..a46c6051b4 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -1395,6 +1395,15 @@ async def do_handle_task(task): ], "method": "light", "batch_chunk_token_size": 4096, + "retry_attempts": 2, + "retry_backoff_seconds": 2.0, + "retry_backoff_max_seconds": 60.0, + "build_subgraph_timeout_per_chunk_seconds": 300, + "build_subgraph_min_timeout_seconds": 600, + "merge_timeout_seconds": 180, + "resolution_timeout_seconds": 1800, + "community_timeout_seconds": 1800, + "lock_acquire_timeout_seconds": 600, } } ) diff --git a/test/testcases/configs.py b/test/testcases/configs.py index a4711bf158..3f093bd814 100644 --- a/test/testcases/configs.py +++ b/test/testcases/configs.py @@ -66,6 +66,15 @@ DEFAULT_PARSER_CONFIG = { ], "method": "light", "batch_chunk_token_size": 4096, + "retry_attempts": 2, + "retry_backoff_seconds": 2.0, + "retry_backoff_max_seconds": 60.0, + "build_subgraph_timeout_per_chunk_seconds": 300, + "build_subgraph_min_timeout_seconds": 600, + "merge_timeout_seconds": 180, + "resolution_timeout_seconds": 1800, + "community_timeout_seconds": 1800, + "lock_acquire_timeout_seconds": 600, }, "parent_child": { "use_parent_child": False,