mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-04 09:39:32 +08:00
Fix bug: run Knowledge graph or RAPTOR, it will update an existing task (#14102)
### What problem does this PR solve? It fixed the bug: https://github.com/infiniflow/ragflow/issues/14101 When run Knowledge graph or RAPTOR, the last document running status will be wrongly set, see below: It should never touch existing document result.  ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@@ -773,7 +773,7 @@ async def run_mindmap():
|
||||
sample_document = documents[0]
|
||||
document_ids = [document["id"] for document in documents]
|
||||
|
||||
task_id = queue_raptor_o_graphrag_tasks(sample_doc_id=sample_document, ty="mindmap", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
|
||||
task_id = queue_raptor_o_graphrag_tasks(sample_doc=sample_document, ty="mindmap", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
|
||||
|
||||
if not KnowledgebaseService.update_by_id(kb.id, {"mindmap_task_id": task_id}):
|
||||
logging.warning(f"Cannot save mindmap_task_id for kb {kb_id}")
|
||||
|
||||
@@ -444,7 +444,7 @@ def run_graphrag(dataset_id: str, tenant_id: str):
|
||||
sample_document = documents[0]
|
||||
document_ids = [document["id"] for document in documents]
|
||||
|
||||
task_id = queue_raptor_o_graphrag_tasks(sample_doc_id=sample_document, ty="graphrag", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
|
||||
task_id = queue_raptor_o_graphrag_tasks(sample_doc=sample_document, ty="graphrag", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
|
||||
|
||||
if not KnowledgebaseService.update_by_id(kb.id, {"graphrag_task_id": task_id}):
|
||||
logging.warning(f"Cannot save graphrag_task_id for Dataset {dataset_id}")
|
||||
@@ -523,7 +523,7 @@ def run_raptor(dataset_id: str, tenant_id: str):
|
||||
sample_document = documents[0]
|
||||
document_ids = [document["id"] for document in documents]
|
||||
|
||||
task_id = queue_raptor_o_graphrag_tasks(sample_doc_id=sample_document, ty="raptor", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
|
||||
task_id = queue_raptor_o_graphrag_tasks(sample_doc=sample_document, ty="raptor", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
|
||||
|
||||
if not KnowledgebaseService.update_by_id(kb.id, {"raptor_task_id": task_id}):
|
||||
logging.warning(f"Cannot save raptor_task_id for Dataset {dataset_id}")
|
||||
|
||||
@@ -984,23 +984,22 @@ class DocumentService(CommonService):
|
||||
queue_tasks(doc, bucket, name, 0)
|
||||
|
||||
|
||||
def queue_raptor_o_graphrag_tasks(sample_doc_id, ty, priority, fake_doc_id="", doc_ids=[]):
|
||||
def queue_raptor_o_graphrag_tasks(sample_doc, ty, priority, fake_doc_id="", doc_ids=[]):
|
||||
"""
|
||||
You can provide a fake_doc_id to bypass the restriction of tasks at the knowledgebase level.
|
||||
Optionally, specify a list of doc_ids to determine which documents participate in the task.
|
||||
"""
|
||||
assert ty in ["graphrag", "raptor", "mindmap"], "type should be graphrag, raptor or mindmap"
|
||||
|
||||
chunking_config = DocumentService.get_chunking_config(sample_doc_id["id"])
|
||||
chunking_config = DocumentService.get_chunking_config(sample_doc["id"])
|
||||
hasher = xxhash.xxh64()
|
||||
for field in sorted(chunking_config.keys()):
|
||||
hasher.update(str(chunking_config[field]).encode("utf-8"))
|
||||
|
||||
def new_task():
|
||||
nonlocal sample_doc_id
|
||||
return {
|
||||
"id": get_uuid(),
|
||||
"doc_id": sample_doc_id["id"],
|
||||
"doc_id": fake_doc_id,
|
||||
"from_page": 100000000,
|
||||
"to_page": 100000000,
|
||||
"task_type": ty,
|
||||
@@ -1015,9 +1014,8 @@ def queue_raptor_o_graphrag_tasks(sample_doc_id, ty, priority, fake_doc_id="", d
|
||||
task["digest"] = hasher.hexdigest()
|
||||
bulk_insert_into_db(Task, [task], True)
|
||||
|
||||
task["doc_id"] = fake_doc_id
|
||||
task["doc_ids"] = doc_ids
|
||||
DocumentService.begin2parse(sample_doc_id["id"], keep_progress=True)
|
||||
DocumentService.begin2parse(task["doc_id"], keep_progress=True)
|
||||
assert REDIS_CONN.queue_product(settings.get_svr_queue_name(priority), message=task), "Can't access Redis. Please check the Redis' status."
|
||||
return task["id"]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user