From ee1bb8a8b566daf002cd87b388325f9cd2112cef Mon Sep 17 00:00:00 2001 From: Idriss Sbaaoui <112825897+6ba3i@users.noreply.github.com> Date: Thu, 2 Apr 2026 18:50:56 +0800 Subject: [PATCH] Fix: overlapping document parse race that can clear chunks (#13900) ### What problem does this PR solve? This PR fixes a race in batch document parsing where overlapping parse requests for the same document could clear/rewrite chunk state and make previously parsed content appear lost. It adds an atomic per-document parse guard so only one parse can run at a time for that document (Fixes #13864 ). ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- api/apps/sdk/doc.py | 16 ++++++++++++---- .../test_doc_sdk_routes_unit.py | 6 +++--- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/api/apps/sdk/doc.py b/api/apps/sdk/doc.py index 77f89de233..bb6422c9f4 100644 --- a/api/apps/sdk/doc.py +++ b/api/apps/sdk/doc.py @@ -27,7 +27,7 @@ from quart import request, send_file from api.constants import FILE_NAME_LEN_LIMIT from api.db import FileType -from api.db.db_models import APIToken, File, Task +from api.db.db_models import APIToken, Document, File, Task from api.db.joint_services.tenant_model_service import get_model_config_by_id, get_model_config_by_type_and_name, get_tenant_default_model_by_type from api.db.services.doc_metadata_service import DocMetadataService from api.db.services.document_service import DocumentService @@ -874,10 +874,18 @@ async def parse(tenant_id, dataset_id): continue if not doc: return get_error_data_result(message=f"You don't own the document {id}.") - if doc[0].run == TaskStatus.RUNNING.value: - return get_error_data_result("Can't parse document that is currently being processed") info = {"run": "1", "progress": 0, "progress_msg": "", "chunk_num": 0, "token_num": 0} - DocumentService.update_by_id(id, info) + if ( + DocumentService.filter_update( + [ + Document.id == id, + ((Document.run.is_null(True)) | (Document.run != TaskStatus.RUNNING.value)), + ], + info, + ) + == 0 + ): + return get_error_data_result("Can't parse document that is currently being processed") settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), dataset_id) TaskService.filter_delete([Task.doc_id == id]) e, doc = DocumentService.get_by_id(id) diff --git a/test/testcases/test_http_api/test_file_management_within_dataset/test_doc_sdk_routes_unit.py b/test/testcases/test_http_api/test_file_management_within_dataset/test_doc_sdk_routes_unit.py index cd4c2e9d23..41a538e386 100644 --- a/test/testcases/test_http_api/test_file_management_within_dataset/test_doc_sdk_routes_unit.py +++ b/test/testcases/test_http_api/test_file_management_within_dataset/test_doc_sdk_routes_unit.py @@ -747,14 +747,14 @@ class TestDocRoutesUnit: monkeypatch.setattr(module.DocumentService, "query", lambda **_kwargs: [_DummyDoc(run=module.TaskStatus.RUNNING.value)]) monkeypatch.setattr( module.DocumentService, - "update_by_id", - lambda *_args, **_kwargs: (_ for _ in ()).throw(AssertionError("update_by_id must not be called for running docs")), + "filter_update", + lambda *_args, **_kwargs: 0, ) res = _run(module.parse.__wrapped__("tenant-1", "ds-1")) assert "currently being processed" in res["message"] monkeypatch.setattr(module.DocumentService, "query", lambda **_kwargs: [_DummyDoc(progress=0)]) - monkeypatch.setattr(module.DocumentService, "update_by_id", lambda *_args, **_kwargs: True) + monkeypatch.setattr(module.DocumentService, "filter_update", lambda *_args, **_kwargs: 1) monkeypatch.setattr(module.DocumentService, "get_by_id", lambda _id: (True, _DummyDoc())) monkeypatch.setattr(module.File2DocumentService, "get_storage_address", lambda **_kwargs: ("b", "n")) _patch_docstore(monkeypatch, module, delete=lambda *_args, **_kwargs: None)