mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-01 00:05:43 +08:00
Fix: overlapping document parse race that can clear chunks (#13900)
### What problem does this PR solve? This PR fixes a race in batch document parsing where overlapping parse requests for the same document could clear/rewrite chunk state and make previously parsed content appear lost. It adds an atomic per-document parse guard so only one parse can run at a time for that document (Fixes #13864 ). ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@@ -27,7 +27,7 @@ from quart import request, send_file
|
||||
|
||||
from api.constants import FILE_NAME_LEN_LIMIT
|
||||
from api.db import FileType
|
||||
from api.db.db_models import APIToken, File, Task
|
||||
from api.db.db_models import APIToken, Document, File, Task
|
||||
from api.db.joint_services.tenant_model_service import get_model_config_by_id, get_model_config_by_type_and_name, get_tenant_default_model_by_type
|
||||
from api.db.services.doc_metadata_service import DocMetadataService
|
||||
from api.db.services.document_service import DocumentService
|
||||
@@ -874,10 +874,18 @@ async def parse(tenant_id, dataset_id):
|
||||
continue
|
||||
if not doc:
|
||||
return get_error_data_result(message=f"You don't own the document {id}.")
|
||||
if doc[0].run == TaskStatus.RUNNING.value:
|
||||
return get_error_data_result("Can't parse document that is currently being processed")
|
||||
info = {"run": "1", "progress": 0, "progress_msg": "", "chunk_num": 0, "token_num": 0}
|
||||
DocumentService.update_by_id(id, info)
|
||||
if (
|
||||
DocumentService.filter_update(
|
||||
[
|
||||
Document.id == id,
|
||||
((Document.run.is_null(True)) | (Document.run != TaskStatus.RUNNING.value)),
|
||||
],
|
||||
info,
|
||||
)
|
||||
== 0
|
||||
):
|
||||
return get_error_data_result("Can't parse document that is currently being processed")
|
||||
settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), dataset_id)
|
||||
TaskService.filter_delete([Task.doc_id == id])
|
||||
e, doc = DocumentService.get_by_id(id)
|
||||
|
||||
@@ -747,14 +747,14 @@ class TestDocRoutesUnit:
|
||||
monkeypatch.setattr(module.DocumentService, "query", lambda **_kwargs: [_DummyDoc(run=module.TaskStatus.RUNNING.value)])
|
||||
monkeypatch.setattr(
|
||||
module.DocumentService,
|
||||
"update_by_id",
|
||||
lambda *_args, **_kwargs: (_ for _ in ()).throw(AssertionError("update_by_id must not be called for running docs")),
|
||||
"filter_update",
|
||||
lambda *_args, **_kwargs: 0,
|
||||
)
|
||||
res = _run(module.parse.__wrapped__("tenant-1", "ds-1"))
|
||||
assert "currently being processed" in res["message"]
|
||||
|
||||
monkeypatch.setattr(module.DocumentService, "query", lambda **_kwargs: [_DummyDoc(progress=0)])
|
||||
monkeypatch.setattr(module.DocumentService, "update_by_id", lambda *_args, **_kwargs: True)
|
||||
monkeypatch.setattr(module.DocumentService, "filter_update", lambda *_args, **_kwargs: 1)
|
||||
monkeypatch.setattr(module.DocumentService, "get_by_id", lambda _id: (True, _DummyDoc()))
|
||||
monkeypatch.setattr(module.File2DocumentService, "get_storage_address", lambda **_kwargs: ("b", "n"))
|
||||
_patch_docstore(monkeypatch, module, delete=lambda *_args, **_kwargs: None)
|
||||
|
||||
Reference in New Issue
Block a user