Files
ragflow/rag/graphrag/general/index.py
Preston Percival e8f19aa338 feat(graphrag): fix merge concurrency and add resume-from-checkpoint (#14238)
This PR addresses three related GraphRAG reliability issues that
together allow long-running GraphRAG tasks (10+ hours of LLM extraction)
to be resumed after a crash or pause without re-doing completed work. It
builds on #14096 (per-doc subgraph cache) and extends the same idea to
the resolution and community-detection phases.

Fixes #14236.

## 1. Fix concurrent merge crash

Long GraphRAG runs would crash near the end of entity resolution with:
```
RuntimeError: dictionary keys changed during iteration
```
in `Extractor._merge_graph_nodes`. Two changes:

- `rag/graphrag/general/extractor.py`: snapshot `graph.neighbors(node1)`
via `list(...)` before iterating, so concurrent `add_edge` /
`remove_node` mutations on the shared `nx.Graph` cannot invalidate the
iterator. Also tracks each redirected neighbour in `node0_neighbors` so
a later merged node sharing the same external neighbour takes the
edge-merge branch instead of overwriting via `add_edge`.
- `rag/graphrag/entity_resolution.py`: serialize the merge step with a
dedicated `asyncio.Semaphore(1)`. `nx.Graph` is not thread-safe and
concurrent merges on overlapping neighbourhoods can produce incorrect
results even with the snapshot fix.

## 2. Don't wipe partial graph on pause

Previously the pause / cancel UI path called
`settings.docStoreConn.delete({"knowledge_graph_kwd": [...]}, ...)`,
destroying every subgraph, entity, relation, and graph row.
Re-triggering then started GraphRAG from scratch even though #14096 had
already added `load_subgraph_from_store`.

After main was merged in (which deleted `api/apps/kb_app.py` per
#14394), the pause path now lives on the new REST surface `DELETE
/v1/datasets/<id>/<index_type>`:

- `api/apps/services/dataset_api_service.py`: `delete_index` accepts a
`wipe: bool = True` parameter. When `False` the doc-store rows and
GraphRAG phase markers are left intact and only the running task is
cancelled. Default preserves historical behaviour.
- `api/apps/restful_apis/dataset_api.py`: parses `?wipe=false|0|no|off`
from the query string and forwards it.
- `web/src/utils/api.ts` + `web/src/services/knowledge-service.ts`:
`unbindPipelineTask` appends `?wipe=false` when explicitly false.
- The GraphRAG pause action in
`web/src/pages/dataset/dataset/generate-button/hook.ts` passes `wipe:
false` for `KnowledgeGraph`; raptor is unchanged.

**UX impact:** the pause icon next to a running GraphRAG task no longer
wipes graph data. The only path that still wipes is the explicit Delete
action in `GenerateLogButton` (trash icon behind a confirmation modal).

## 3. Phase-completion markers (`rag/graphrag/phase_markers.py`)

A small Redis-backed marker layer at
`graphrag:phase:{kb_id}:{resolution_done|community_done}` (7-day TTL).
`run_graphrag_for_kb` consults the markers on entry and skips phases
that already completed in a prior run. Markers are cleared automatically
when:
- new docs are merged into the graph (which invalidates prior resolution
and community results),
- `delete_index` wipes the graph, or
- `delete_knowledge_graph` is called.

Redis failures never block a run -- markers are an optimization, not a
gate.

## 4. Idempotent community detection

`extract_community` previously did `delete-then-insert` on
`community_report` rows; a crash mid-insert left the dataset with no
reports. Now report IDs are derived deterministically from `(kb_id,
community.title)`, the existing report IDs are snapshotted before
insert, new rows are written, then only stale rows are pruned. A failure
at any step leaves either the prior or the new report set intact --
never a partial mix.

## 5. Tunable doc-store insert pipeline

The GraphRAG insert loop in `rag/graphrag/utils.py` and the
`community_report` insert in `rag/graphrag/general/index.py` were both
hardcoded to `es_bulk_size = 4` and ran strictly sequentially. On a real
KB this meant 1077 chunks took ~21 minutes for a 100-chunk slice -- pure
round-trip overhead.

- New `insert_chunks_bounded()` helper in `rag/graphrag/utils.py`
batches inserts via a bounded `asyncio.Semaphore`. Same retry / timeout
semantics as the prior loop.
- Defaults: 64 docs per batch, 4 batches in flight (matches the regular
ingest pipeline in `document_service.py`). Tunable per-deployment via
`GRAPHRAG_INSERT_BULK_SIZE` and `GRAPHRAG_INSERT_CONCURRENCY`.
- Both `set_graph` and `extract_community` now use the helper.

This dropped the same 1077-chunk insert from minutes to seconds in local
testing without measurable extra pressure on Infinity (total in-flight
docs ≤ `BULK_SIZE × CONCURRENCY` = 256 by default).

## Tests

- `test/unit_test/rag/graphrag/test_merge_graph_nodes.py` (3 tests):
dense neighbourhood merge, neighbour-snapshot regression, concurrent
serialized merges.
- `test/unit_test/rag/graphrag/test_phase_markers.py` (4 tests): set/has
round-trip, kb-scoped clear, no-op on empty input, graceful Redis
failure.
-
`test/testcases/test_web_api/test_dataset_management/test_dataset_sdk_routes_unit.py`:
new `test_delete_index_wipe_flag_unit` covers `wipe=false` for both
GraphRAG and raptor on the new REST route, and confirms the default
still wipes and clears phase markers.

## Compatibility

- Backward compatible: tasks queued before this change behave
identically (default `wipe=true`, no markers expected).
- No schema/migration changes; all new state lives in Redis.
- New optional REST query param `wipe` on `DELETE
/v1/datasets/<id>/<index_type>`.
- New optional env vars `GRAPHRAG_INSERT_BULK_SIZE` and
`GRAPHRAG_INSERT_CONCURRENCY`; defaults preserve safe behaviour.

## Example of resume

Screenshot below shows a test resuming knowledge graph generation after
applying the concurrency fix and re-deploying.

<img width="521" height="677" alt="image"
src="https://github.com/user-attachments/assets/9ef0d405-cbb3-420d-a1a1-e51f3e7e9b7a"
/>

### Type of change

- [X] Bug Fix (non-breaking change which fixes an issue)
- [ ] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):
2026-05-06 15:01:01 +08:00

753 lines
28 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import json
import logging
import os
import networkx as nx
from api.db.services.document_service import DocumentService
from api.db.services.task_service import has_canceled
from common.exceptions import TaskCanceledException
from common.connection_utils import timeout
from rag.graphrag.entity_resolution import EntityResolution
from rag.graphrag.general.community_reports_extractor import CommunityReportsExtractor
from rag.graphrag.general.extractor import Extractor
from rag.graphrag.general.graph_extractor import GraphExtractor as GeneralKGExt
from rag.graphrag.light.graph_extractor import GraphExtractor as LightKGExt
from rag.graphrag.phase_markers import (
PHASE_COMMUNITY,
PHASE_RESOLUTION,
clear_phase_markers,
has_phase_marker,
set_phase_marker,
)
from rag.graphrag.utils import (
GraphChange,
chunk_id,
does_graph_contains,
get_graph,
graph_merge,
insert_chunks_bounded,
set_graph,
tidy_graph,
)
from common.misc_utils import thread_pool_exec
from rag.nlp import rag_tokenizer, search
from rag.utils.redis_conn import RedisDistributedLock
from common import settings
from common.doc_store.doc_store_base import OrderByExpr
async def load_subgraph_from_store(tenant_id: str, kb_id: str, doc_id: str):
"""Load a previously saved subgraph from the doc store.
Filters directly by source_id (== doc_id) and knowledge_graph_kwd in the
query so the doc store index does the heavy lifting. Expects at most one
matching chunk per doc_id (as written by generate_subgraph).
Returns a networkx Graph on hit, or None on miss.
"""
fields = ["content_with_weight", "source_id"]
condition = {
"knowledge_graph_kwd": ["subgraph"],
"removed_kwd": "N",
"source_id": [doc_id],
}
try:
res = await thread_pool_exec(
settings.docStoreConn.search,
fields, [], condition, [], OrderByExpr(),
0, 1, search.index_name(tenant_id), [kb_id]
)
field_map = settings.docStoreConn.get_fields(res, fields)
for cid, row in field_map.items():
content = row.get("content_with_weight", "")
if not content:
continue
try:
data = json.loads(content)
sg = nx.node_link_graph(data, edges="edges")
sg.graph["source_id"] = [doc_id]
logging.info(
"Checkpoint hit: subgraph for doc %s (tenant=%s kb=%s) found at chunk %s",
doc_id, tenant_id, kb_id, cid,
)
return sg
except Exception:
logging.exception(
"Failed to parse subgraph JSON for doc %s chunk %s", doc_id, cid
)
except Exception:
logging.exception("Failed to load subgraph from store for doc %s", doc_id)
return None
logging.info(
"Checkpoint miss: no subgraph for doc %s (tenant=%s kb=%s)",
doc_id, tenant_id, kb_id,
)
return None
async def run_graphrag(
row: dict,
language,
with_resolution: bool,
with_community: bool,
chat_model,
embedding_model,
callback,
):
enable_timeout_assertion = os.environ.get("ENABLE_TIMEOUT_ASSERTION")
start = asyncio.get_running_loop().time()
tenant_id, kb_id, doc_id = row["tenant_id"], str(row["kb_id"]), row["doc_id"]
chunks = []
for d in settings.retriever.chunk_list(doc_id, tenant_id, [kb_id], max_count=10000, fields=["content_with_weight", "doc_id"], sort_by_position=True):
chunks.append(d["content_with_weight"])
timeout_sec = max(120, len(chunks) * 60 * 10) if enable_timeout_assertion else 10000000000
try:
subgraph = await asyncio.wait_for(
generate_subgraph(
LightKGExt if "method" not in row["kb_parser_config"].get("graphrag", {})
or row["kb_parser_config"]["graphrag"]["method"] != "general"
else GeneralKGExt,
tenant_id,
kb_id,
doc_id,
chunks,
language,
row["kb_parser_config"]["graphrag"].get("entity_types", []),
chat_model,
embedding_model,
callback,
),
timeout=timeout_sec,
)
except asyncio.TimeoutError:
logging.error("generate_subgraph timeout")
raise
if not subgraph:
return
graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=1200)
await graphrag_task_lock.spin_acquire()
callback(msg=f"run_graphrag {doc_id} graphrag_task_lock acquired")
try:
subgraph_nodes = set(subgraph.nodes())
new_graph = await merge_subgraph(
tenant_id,
kb_id,
doc_id,
subgraph,
embedding_model,
callback,
)
assert new_graph is not None
if not with_resolution and not with_community:
return
if with_resolution:
await graphrag_task_lock.spin_acquire()
callback(msg=f"run_graphrag {doc_id} graphrag_task_lock acquired")
await resolve_entities(
new_graph,
subgraph_nodes,
tenant_id,
kb_id,
doc_id,
chat_model,
embedding_model,
callback,
task_id=row["id"],
)
if with_community:
await graphrag_task_lock.spin_acquire()
callback(msg=f"run_graphrag {doc_id} graphrag_task_lock acquired")
await extract_community(
new_graph,
tenant_id,
kb_id,
doc_id,
chat_model,
embedding_model,
callback,
task_id=row["id"],
)
finally:
graphrag_task_lock.release()
now = asyncio.get_running_loop().time()
callback(msg=f"GraphRAG for doc {doc_id} done in {now - start:.2f} seconds.")
return
async def run_graphrag_for_kb(
row: dict,
doc_ids: list[str],
language: str,
kb_parser_config: dict,
chat_model,
embedding_model,
callback,
*,
with_resolution: bool = True,
with_community: bool = True,
max_parallel_docs: int = 4,
) -> dict:
tenant_id, kb_id = row["tenant_id"], row["kb_id"]
enable_timeout_assertion = os.environ.get("ENABLE_TIMEOUT_ASSERTION")
start = asyncio.get_running_loop().time()
fields_for_chunks = ["content_with_weight", "doc_id"]
if not doc_ids:
logging.info(f"Fetching all docs for {kb_id}")
docs, _ = DocumentService.get_by_kb_id(
kb_id=kb_id,
page_number=0,
items_per_page=0,
orderby="create_time",
desc=False,
keywords="",
run_status=[],
types=[],
suffix=[],
)
doc_ids = [doc["id"] for doc in docs]
doc_ids = list(dict.fromkeys(doc_ids))
if not doc_ids:
callback(msg=f"[GraphRAG] kb:{kb_id} has no processable doc_id.")
return {"ok_docs": [], "failed_docs": [], "total_docs": 0, "total_chunks": 0, "seconds": 0.0}
def load_doc_chunks(doc_id: str) -> list[str]:
from common.token_utils import num_tokens_from_string
chunks = []
current_chunk = ""
# DEBUG: Obtener todos los chunks primero
raw_chunks = list(settings.retriever.chunk_list(
doc_id,
tenant_id,
[kb_id],
max_count=10000, # FIX: Aumentar límite para procesar todos los chunks
fields=fields_for_chunks,
sort_by_position=True,
))
callback(msg=f"[DEBUG] chunk_list() returned {len(raw_chunks)} raw chunks for doc {doc_id}")
for d in raw_chunks:
content = d["content_with_weight"]
if num_tokens_from_string(current_chunk + content) < 4096:
current_chunk += content
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = content
if current_chunk:
chunks.append(current_chunk)
return chunks
all_doc_chunks: dict[str, list[str]] = {}
total_chunks = 0
for doc_id in doc_ids:
chunks = load_doc_chunks(doc_id)
all_doc_chunks[doc_id] = chunks
total_chunks += len(chunks)
if total_chunks == 0:
callback(msg=f"[GraphRAG] kb:{kb_id} has no available chunks in all documents, skip.")
return {"ok_docs": [], "failed_docs": doc_ids, "total_docs": len(doc_ids), "total_chunks": 0, "seconds": 0.0}
semaphore = asyncio.Semaphore(max_parallel_docs)
subgraphs: dict[str, object] = {}
failed_docs: list[tuple[str, str]] = [] # (doc_id, error)
async def build_one(doc_id: str):
if has_canceled(row["id"]):
callback(msg=f"Task {row['id']} cancelled, stopping execution.")
raise TaskCanceledException(f"Task {row['id']} was cancelled")
chunks = all_doc_chunks.get(doc_id, [])
if not chunks:
callback(msg=f"[GraphRAG] doc:{doc_id} has no available chunks, skip generation.")
return
kg_extractor = LightKGExt if ("method" not in kb_parser_config.get("graphrag", {}) or kb_parser_config["graphrag"]["method"] != "general") else GeneralKGExt
deadline = max(120, len(chunks) * 60 * 10) if enable_timeout_assertion else 10000000000
async with semaphore:
# CHECKPOINT: bounded by semaphore so doc-store lookups respect max_parallel_docs
existing_sg = await load_subgraph_from_store(tenant_id, kb_id, doc_id)
if existing_sg:
subgraphs[doc_id] = existing_sg
callback(msg=f"[GraphRAG] doc:{doc_id} subgraph found in store, skipping LLM extraction.")
return
try:
msg = f"[GraphRAG] build_subgraph doc:{doc_id}"
callback(msg=f"{msg} start (chunks={len(chunks)}, timeout={deadline}s)")
try:
sg = await asyncio.wait_for(
generate_subgraph(
kg_extractor,
tenant_id,
kb_id,
doc_id,
chunks,
language,
kb_parser_config.get("graphrag", {}).get("entity_types", []),
chat_model,
embedding_model,
callback,
task_id=row["id"]
),
timeout=deadline,
)
except asyncio.TimeoutError:
failed_docs.append((doc_id, "timeout"))
callback(msg=f"{msg} FAILED: timeout")
return
if sg:
subgraphs[doc_id] = sg
callback(msg=f"{msg} done")
else:
failed_docs.append((doc_id, "subgraph is empty"))
callback(msg=f"{msg} empty")
except TaskCanceledException as canceled:
callback(msg=f"[GraphRAG] build_subgraph doc:{doc_id} FAILED: {canceled}")
except Exception as e:
failed_docs.append((doc_id, repr(e)))
callback(msg=f"[GraphRAG] build_subgraph doc:{doc_id} FAILED: {e!r}")
if has_canceled(row["id"]):
callback(msg=f"Task {row['id']} cancelled before processing documents.")
raise TaskCanceledException(f"Task {row['id']} was cancelled")
tasks = [asyncio.create_task(build_one(doc_id)) for doc_id in doc_ids]
try:
await asyncio.gather(*tasks, return_exceptions=False)
except Exception as e:
logging.error(f"Error in asyncio.gather: {e}")
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise
if has_canceled(row["id"]):
callback(msg=f"Task {row['id']} cancelled after document processing.")
raise TaskCanceledException(f"Task {row['id']} was cancelled")
ok_docs = [d for d in doc_ids if d in subgraphs]
final_graph = None
# Determine whether the resolution/community phases still need to run on
# this KB. Markers from a prior task let us skip already-completed phases
# even when no new docs are merged this round (the resume path).
resolution_pending = with_resolution and not has_phase_marker(kb_id, PHASE_RESOLUTION)
community_pending = with_community and not has_phase_marker(kb_id, PHASE_COMMUNITY)
if not ok_docs and not resolution_pending and not community_pending:
callback(msg=f"[GraphRAG] kb:{kb_id} no subgraphs to merge and no phases pending, end.")
now = asyncio.get_running_loop().time()
return {"ok_docs": [], "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start}
kb_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value="batch_merge", timeout=1200)
await kb_lock.spin_acquire()
callback(msg=f"[GraphRAG] kb:{kb_id} merge lock acquired")
if has_canceled(row["id"]):
callback(msg=f"Task {row['id']} cancelled before merging subgraphs.")
raise TaskCanceledException(f"Task {row['id']} was cancelled")
try:
union_nodes: set = set()
for doc_id in ok_docs:
sg = subgraphs[doc_id]
union_nodes.update(set(sg.nodes()))
new_graph = await merge_subgraph(
tenant_id,
kb_id,
doc_id,
sg,
embedding_model,
callback,
)
if new_graph is not None:
final_graph = new_graph
if ok_docs and final_graph is None:
callback(msg=f"[GraphRAG] kb:{kb_id} merge finished (no in-memory graph returned).")
elif ok_docs:
callback(msg=f"[GraphRAG] kb:{kb_id} merge finished, graph ready.")
# New content was merged into the global graph; any prior
# resolution/community results are now stale and must be redone
# on this or a future run. Clear phase markers accordingly.
clear_phase_markers(kb_id)
resolution_pending = with_resolution
community_pending = with_community
callback(msg=f"[GraphRAG] kb:{kb_id} cleared phase markers after merge.")
finally:
kb_lock.release()
if not with_resolution and not with_community:
now = asyncio.get_running_loop().time()
callback(msg=f"[GraphRAG] KB merge done in {now - start:.2f}s. ok={len(ok_docs)} / total={len(doc_ids)}")
return {"ok_docs": ok_docs, "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start}
if not resolution_pending and not community_pending:
now = asyncio.get_running_loop().time()
callback(msg=f"[GraphRAG] kb:{kb_id} all requested phases already complete; nothing to do.")
return {"ok_docs": ok_docs, "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start}
if has_canceled(row["id"]):
callback(msg=f"Task {row['id']} cancelled before resolution/community extraction.")
raise TaskCanceledException(f"Task {row['id']} was cancelled")
await kb_lock.spin_acquire()
callback(msg=f"[GraphRAG] kb:{kb_id} post-merge lock acquired for resolution/community")
try:
# Resume path: no docs were merged this round but pending phases
# require the previously-persisted graph. Load it from the doc store.
if final_graph is None:
final_graph = await get_graph(tenant_id, kb_id)
if final_graph is None:
callback(msg=f"[GraphRAG] kb:{kb_id} no persisted graph found; cannot run resolution/community.")
now = asyncio.get_running_loop().time()
return {"ok_docs": ok_docs, "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start}
callback(msg=f"[GraphRAG] kb:{kb_id} loaded persisted graph for resume.")
subgraph_nodes = set()
for sg in subgraphs.values():
subgraph_nodes.update(set(sg.nodes()))
# On a pure-resume run (no new docs) the union of "newly added" nodes
# is empty, but resolution still needs *some* anchor set. Fall back to
# all graph nodes so candidate pairing actually finds something.
if not subgraph_nodes:
subgraph_nodes = set(final_graph.nodes())
if resolution_pending:
await resolve_entities(
final_graph,
subgraph_nodes,
tenant_id,
kb_id,
None,
chat_model,
embedding_model,
callback,
task_id=row["id"],
)
set_phase_marker(kb_id, PHASE_RESOLUTION)
elif with_resolution:
callback(msg=f"[GraphRAG] kb:{kb_id} resolution already completed previously, skipping.")
if community_pending:
await extract_community(
final_graph,
tenant_id,
kb_id,
None,
chat_model,
embedding_model,
callback,
task_id=row["id"],
)
set_phase_marker(kb_id, PHASE_COMMUNITY)
elif with_community:
callback(msg=f"[GraphRAG] kb:{kb_id} community detection already completed previously, skipping.")
finally:
kb_lock.release()
now = asyncio.get_running_loop().time()
callback(msg=f"[GraphRAG] GraphRAG for KB {kb_id} done in {now - start:.2f} seconds. ok={len(ok_docs)} failed={len(failed_docs)} total_docs={len(doc_ids)} total_chunks={total_chunks}")
return {
"ok_docs": ok_docs,
"failed_docs": failed_docs, # [(doc_id, error), ...]
"total_docs": len(doc_ids),
"total_chunks": total_chunks,
"seconds": now - start,
}
async def generate_subgraph(
extractor: Extractor,
tenant_id: str,
kb_id: str,
doc_id: str,
chunks: list[str],
language,
entity_types,
llm_bdl,
embed_bdl,
callback,
task_id: str = "",
):
if task_id and has_canceled(task_id):
callback(msg=f"Task {task_id} cancelled during subgraph generation for doc {doc_id}.")
raise TaskCanceledException(f"Task {task_id} was cancelled")
contains = await does_graph_contains(tenant_id, kb_id, doc_id)
if contains:
callback(msg=f"Graph already contains {doc_id}")
return None
start = asyncio.get_running_loop().time()
ext = extractor(
llm_bdl,
language=language,
entity_types=entity_types,
)
ents, rels = await ext(doc_id, chunks, callback, task_id=task_id)
subgraph = nx.Graph()
for ent in ents:
if task_id and has_canceled(task_id):
callback(msg=f"Task {task_id} cancelled during entity processing for doc {doc_id}.")
raise TaskCanceledException(f"Task {task_id} was cancelled")
assert "description" in ent, f"entity {ent} does not have description"
ent["source_id"] = [doc_id]
subgraph.add_node(ent["entity_name"], **ent)
ignored_rels = 0
for rel in rels:
if task_id and has_canceled(task_id):
callback(msg=f"Task {task_id} cancelled during relationship processing for doc {doc_id}.")
raise TaskCanceledException(f"Task {task_id} was cancelled")
assert "description" in rel, f"relation {rel} does not have description"
if not subgraph.has_node(rel["src_id"]) or not subgraph.has_node(rel["tgt_id"]):
ignored_rels += 1
continue
rel["source_id"] = [doc_id]
subgraph.add_edge(
rel["src_id"],
rel["tgt_id"],
**rel,
)
if ignored_rels:
callback(msg=f"ignored {ignored_rels} relations due to missing entities.")
tidy_graph(subgraph, callback, check_attribute=False)
subgraph.graph["source_id"] = [doc_id]
chunk = {
"content_with_weight": json.dumps(nx.node_link_data(subgraph, edges="edges"), ensure_ascii=False),
"knowledge_graph_kwd": "subgraph",
"kb_id": kb_id,
"source_id": [doc_id],
"available_int": 0,
"removed_kwd": "N",
}
cid = chunk_id(chunk)
await thread_pool_exec(settings.docStoreConn.delete,{"knowledge_graph_kwd": "subgraph", "source_id": doc_id},search.index_name(tenant_id),kb_id,)
await thread_pool_exec(settings.docStoreConn.insert,[{"id": cid, **chunk}],search.index_name(tenant_id),kb_id,)
now = asyncio.get_running_loop().time()
callback(msg=f"generated subgraph for doc {doc_id} in {now - start:.2f} seconds.")
return subgraph
@timeout(60 * 3)
async def merge_subgraph(
tenant_id: str,
kb_id: str,
doc_id: str,
subgraph: nx.Graph,
embedding_model,
callback,
):
start = asyncio.get_running_loop().time()
change = GraphChange()
old_graph = await get_graph(tenant_id, kb_id, subgraph.graph["source_id"])
if old_graph is not None:
logging.info("Merge with an exiting graph...................")
tidy_graph(old_graph, callback)
new_graph = graph_merge(old_graph, subgraph, change)
else:
new_graph = subgraph
change.added_updated_nodes = set(new_graph.nodes())
change.added_updated_edges = set(new_graph.edges())
pr = nx.pagerank(new_graph)
for node_name, pagerank in pr.items():
new_graph.nodes[node_name]["pagerank"] = pagerank
await set_graph(tenant_id, kb_id, embedding_model, new_graph, change, callback)
now = asyncio.get_running_loop().time()
callback(msg=f"merging subgraph for doc {doc_id} into the global graph done in {now - start:.2f} seconds.")
return new_graph
@timeout(60 * 30, 1)
async def resolve_entities(
graph,
subgraph_nodes: set[str],
tenant_id: str,
kb_id: str,
doc_id: str,
llm_bdl,
embed_bdl,
callback,
task_id: str = "",
):
# Check if task has been canceled before resolution
if task_id and has_canceled(task_id):
callback(msg=f"Task {task_id} cancelled during entity resolution.")
raise TaskCanceledException(f"Task {task_id} was cancelled")
start = asyncio.get_running_loop().time()
er = EntityResolution(
llm_bdl,
)
reso = await er(graph, subgraph_nodes, callback=callback, task_id=task_id)
graph = reso.graph
change = reso.change
callback(msg=f"Graph resolution removed {len(change.removed_nodes)} nodes and {len(change.removed_edges)} edges.")
callback(msg="Graph resolution updated pagerank.")
if task_id and has_canceled(task_id):
callback(msg=f"Task {task_id} cancelled after entity resolution.")
raise TaskCanceledException(f"Task {task_id} was cancelled")
await set_graph(tenant_id, kb_id, embed_bdl, graph, change, callback)
now = asyncio.get_running_loop().time()
callback(msg=f"Graph resolution done in {now - start:.2f}s.")
@timeout(60 * 30, 1)
async def extract_community(
graph,
tenant_id: str,
kb_id: str,
doc_id: str,
llm_bdl,
embed_bdl,
callback,
task_id: str = "",
):
if task_id and has_canceled(task_id):
callback(msg=f"Task {task_id} cancelled before community extraction.")
raise TaskCanceledException(f"Task {task_id} was cancelled")
start = asyncio.get_running_loop().time()
ext = CommunityReportsExtractor(
llm_bdl,
)
cr = await ext(graph, callback=callback, task_id=task_id)
if task_id and has_canceled(task_id):
callback(msg=f"Task {task_id} cancelled during community extraction.")
raise TaskCanceledException(f"Task {task_id} was cancelled")
community_structure = cr.structured_output
community_reports = cr.output
doc_ids = graph.graph["source_id"]
now = asyncio.get_running_loop().time()
callback(msg=f"Graph extracted {len(cr.structured_output)} communities in {now - start:.2f}s.")
start = now
if task_id and has_canceled(task_id):
callback(msg=f"Task {task_id} cancelled during community indexing.")
raise TaskCanceledException(f"Task {task_id} was cancelled")
chunks = []
for stru, rep in zip(community_structure, community_reports):
obj = {
"report": rep,
"evidences": "\n".join([f.get("explanation", "") for f in stru["findings"]]),
}
# Deterministic id derived from (kb_id, community title) so reruns of
# extract_community produce stable ids. Combined with insert-then-
# prune below, this means a crash mid-insert leaves the prior set of
# community reports intact -- never the partial-delete state the old
# delete-then-insert order produced.
chunk_payload_for_id = {
"content_with_weight": f"community_report::{stru['title']}",
"kb_id": kb_id,
}
chunk = {
"id": chunk_id(chunk_payload_for_id),
"docnm_kwd": stru["title"],
"title_tks": rag_tokenizer.tokenize(stru["title"]),
"content_with_weight": json.dumps(obj, ensure_ascii=False),
"content_ltks": rag_tokenizer.tokenize(obj["report"] + " " + obj["evidences"]),
"knowledge_graph_kwd": "community_report",
"weight_flt": stru["weight"],
"entities_kwd": stru["entities"],
"important_kwd": stru["entities"],
"kb_id": kb_id,
"source_id": list(doc_ids),
"available_int": 0,
}
chunk["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(chunk["content_ltks"])
chunks.append(chunk)
new_ids: set[str] = {c["id"] for c in chunks}
# Snapshot existing community_report ids BEFORE inserting so we can
# delete exactly the stale set afterwards. If the search fails we fall
# back to the prior delete-everything-then-insert behaviour rather than
# leaving an inconsistent mix.
old_ids: list[str] = []
try:
existing_res = await thread_pool_exec(
settings.docStoreConn.search,
["id"], [], {"knowledge_graph_kwd": ["community_report"]}, [], OrderByExpr(),
0, 10000, search.index_name(tenant_id), [kb_id],
)
existing_fields = settings.docStoreConn.get_fields(existing_res, ["id"])
old_ids = list(existing_fields.keys())
except Exception:
logging.exception("Failed to enumerate existing community reports for kb %s; falling back to delete-then-insert.", kb_id)
await thread_pool_exec(settings.docStoreConn.delete, {"knowledge_graph_kwd": "community_report", "kb_id": kb_id}, search.index_name(tenant_id), kb_id)
old_ids = []
await insert_chunks_bounded(chunks, tenant_id, kb_id, callback=callback, label="Insert community reports")
# Now that all new reports are persisted, prune stale rows. Anything in
# old_ids that is not also in new_ids is no longer current (community
# composition changed across runs). A failure here just leaves stale
# rows; the new rows are already in place.
stale_ids = [i for i in old_ids if i not in new_ids]
if stale_ids:
try:
await thread_pool_exec(
settings.docStoreConn.delete,
{"knowledge_graph_kwd": ["community_report"], "id": stale_ids},
search.index_name(tenant_id),
kb_id,
)
except Exception:
logging.exception("Failed to prune %d stale community reports for kb %s", len(stale_ids), kb_id)
if task_id and has_canceled(task_id):
callback(msg=f"Task {task_id} cancelled after community indexing.")
raise TaskCanceledException(f"Task {task_id} was cancelled")
now = asyncio.get_running_loop().time()
callback(msg=f"Graph indexed {len(cr.structured_output)} communities in {now - start:.2f}s.")
return community_structure, community_reports