Fix: serialize GraphRAG entity resolution merges to avoid graph mutation races (#14237)

### What problem does this PR solve?

This PR fixes the merge-phase crash reported in #14236 during GraphRAG
entity resolution.

The issue happens after candidate pair resolution completes, when
multiple merge coroutines mutate the same shared `networkx` graph
concurrently. In `_merge_graph_nodes`, the code iterates over
`graph.neighbors(node1)` and also awaits during edge/description
merging. That allows another coroutine to modify the graph adjacency
structure in between, which can trigger `RuntimeError: dictionary keys
changed during iteration` and can also lead to unsafe shared-graph
mutation.

This change keeps the PR scoped to that single issue by:
- serializing merge-time graph mutations with a dedicated merge lock
- snapshotting `graph.neighbors(node1)` with `list(...)` before
iteration

Together, these changes prevent concurrent mutation of the shared graph
during the merge phase and make the merge loop safe against live-view
invalidation.

Fixes #14236

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
NeedmeFordev
2026-04-22 01:42:53 -07:00
committed by GitHub
parent e0f0eb277d
commit 38e45a1117

View File

@@ -159,15 +159,16 @@ class EntityResolution(Extractor):
connect_graph = nx.Graph()
connect_graph.add_edges_from(resolution_result)
merge_lock = asyncio.Lock()
async def limited_merge_nodes(graph, nodes, change):
async with semaphore:
async with merge_lock:
await self._merge_graph_nodes(graph, nodes, change, task_id)
tasks = []
for sub_connect_graph in nx.connected_components(connect_graph):
merging_nodes = list(sub_connect_graph)
tasks.append(asyncio.create_task(limited_merge_nodes(graph, merging_nodes, change))
)
tasks.append(asyncio.create_task(limited_merge_nodes(graph, merging_nodes, change)))
try:
await asyncio.gather(*tasks, return_exceptions=False)
except Exception as e: