From bd99a22661b12ca8d8375f2b28512afb509a1ad3 Mon Sep 17 00:00:00 2001 From: dale053 Date: Wed, 13 May 2026 23:48:52 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20atomic=20chunk/token=20counter=20updates?= =?UTF-8?q?=20for=20documents=20and=20knowledge=20b=E2=80=A6=20(#14867)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? Fixes #14866. Previously, `DocumentService.increment_chunk_num` and `decrement_chunk_num` updated the `Document` row and its parent `Knowledgebase` row in two separate, non-transactional statements. If the second update failed (DB error, connection drop, etc.) after the first one succeeded, the document and knowledge base chunk/token counters would drift apart and stay inconsistent. There was also a behavioral asymmetry between the two methods: - `increment_chunk_num` only logged a warning when the document row was missing and returned a value that callers usually treated as success. - `decrement_chunk_num` raised `LookupError` in the same situation. This PR makes the counter updates atomic and aligns the missing-document behavior between the two methods: - Wrap the `Document` and `Knowledgebase` updates in `increment_chunk_num` / `decrement_chunk_num` inside a `DB.atomic()` block so both succeed or both roll back together. - Raise `LookupError` from `increment_chunk_num` when the target document no longer exists, matching `decrement_chunk_num`. - Update `reset_document_for_reparse` in `document_api_service.py` to catch the new `LookupError` and return a proper "Document not found!" API error instead of propagating the exception. No schema changes, no API contract changes for the success path; only the failure mode for a missing document during reparse is now a clean error response instead of an uncaught exception. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- api/apps/services/document_api_service.py | 17 +++-- api/db/services/document_service.py | 89 +++++++++++++++++++---- 2 files changed, 83 insertions(+), 23 deletions(-) diff --git a/api/apps/services/document_api_service.py b/api/apps/services/document_api_service.py index 59abbd2507..63c71ff4a2 100644 --- a/api/apps/services/document_api_service.py +++ b/api/apps/services/document_api_service.py @@ -122,13 +122,16 @@ def reset_document_for_reparse(doc, tenant_id, parser_id=None, pipeline_id=None) # Delete chunks from document store if doc.token_num > 0: - e = DocumentService.increment_chunk_num( - doc.id, - doc.kb_id, - doc.token_num * -1, - doc.chunk_num * -1, - doc.process_duration * -1, - ) + try: + e = DocumentService.increment_chunk_num( + doc.id, + doc.kb_id, + doc.token_num * -1, + doc.chunk_num * -1, + doc.process_duration * -1, + ) + except LookupError: + return get_error_data_result(message="Document not found!") if not e: return get_error_data_result(message="Document not found!") settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id) diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 44c9995c02..3d9bc09dbb 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -591,27 +591,84 @@ class DocumentService(CommonService): @classmethod @DB.connection_context() def increment_chunk_num(cls, doc_id, kb_id, token_num, chunk_num, duration): - num = ( - cls.model.update(token_num=cls.model.token_num + token_num, chunk_num=cls.model.chunk_num + chunk_num, process_duration=cls.model.process_duration + duration) - .where(cls.model.id == doc_id) - .execute() - ) - if num == 0: - logging.warning("Document not found which is supposed to be there") - num = Knowledgebase.update(token_num=Knowledgebase.token_num + token_num, chunk_num=Knowledgebase.chunk_num + chunk_num).where(Knowledgebase.id == kb_id).execute() + """Atomically add chunk/token counters on the document and its knowledge base.""" + with DB.atomic(): + num = ( + cls.model.update( + token_num=cls.model.token_num + token_num, + chunk_num=cls.model.chunk_num + chunk_num, + process_duration=cls.model.process_duration + duration, + ) + .where((cls.model.id == doc_id) & (cls.model.kb_id == kb_id)) + .execute() + ) + if num == 0: + logging.error( + "increment_chunk_num: no document matched doc_id=%s kb_id=%s " + "token_num=%s chunk_num=%s duration=%s", + doc_id, + kb_id, + token_num, + chunk_num, + duration, + ) + raise LookupError("Document not found which is supposed to be there") + num = ( + Knowledgebase.update( + token_num=Knowledgebase.token_num + token_num, + chunk_num=Knowledgebase.chunk_num + chunk_num, + ) + .where(Knowledgebase.id == kb_id) + .execute() + ) + if num == 0: + logging.error( + "increment_chunk_num: no knowledgebase matched kb_id=%s for doc_id=%s " + "token_num=%s chunk_num=%s duration=%s", + kb_id, + doc_id, + token_num, + chunk_num, + duration, + ) + raise LookupError("Knowledgebase not found which is supposed to be there") return num @classmethod @DB.connection_context() def decrement_chunk_num(cls, doc_id, kb_id, token_num, chunk_num, duration): - num = ( - cls.model.update(token_num=cls.model.token_num - token_num, chunk_num=cls.model.chunk_num - chunk_num, process_duration=cls.model.process_duration + duration) - .where(cls.model.id == doc_id) - .execute() - ) - if num == 0: - raise LookupError("Document not found which is supposed to be there") - num = Knowledgebase.update(token_num=Knowledgebase.token_num - token_num, chunk_num=Knowledgebase.chunk_num - chunk_num).where(Knowledgebase.id == kb_id).execute() + """Atomically subtract chunk/token counters on the document and its knowledge base.""" + with DB.atomic(): + num = ( + cls.model.update( + token_num=cls.model.token_num - token_num, + chunk_num=cls.model.chunk_num - chunk_num, + process_duration=cls.model.process_duration + duration, + ) + .where((cls.model.id == doc_id) & (cls.model.kb_id == kb_id)) + .execute() + ) + if num == 0: + raise LookupError("Document not found which is supposed to be there") + num = ( + Knowledgebase.update( + token_num=Knowledgebase.token_num - token_num, + chunk_num=Knowledgebase.chunk_num - chunk_num, + ) + .where(Knowledgebase.id == kb_id) + .execute() + ) + if num == 0: + logging.error( + "decrement_chunk_num: no knowledgebase matched kb_id=%s for doc_id=%s " + "token_num=%s chunk_num=%s duration=%s", + kb_id, + doc_id, + token_num, + chunk_num, + duration, + ) + raise LookupError("Knowledgebase not found which is supposed to be there") return num @classmethod