fix: atomic chunk/token counter updates for documents and knowledge b… (#14867)

### What problem does this PR solve?

Fixes #14866.

Previously, `DocumentService.increment_chunk_num` and
`decrement_chunk_num` updated the `Document` row and its parent
`Knowledgebase` row in two separate, non-transactional statements. If
the second update failed (DB error, connection drop, etc.) after the
first one succeeded, the document and knowledge base chunk/token
counters would drift apart and stay inconsistent.

There was also a behavioral asymmetry between the two methods:

- `increment_chunk_num` only logged a warning when the document row was
missing and returned a value that callers usually treated as success.
- `decrement_chunk_num` raised `LookupError` in the same situation.

This PR makes the counter updates atomic and aligns the missing-document
behavior between the two methods:

- Wrap the `Document` and `Knowledgebase` updates in
`increment_chunk_num` / `decrement_chunk_num` inside a `DB.atomic()`
block so both succeed or both roll back together.
- Raise `LookupError` from `increment_chunk_num` when the target
document no longer exists, matching `decrement_chunk_num`.
- Update `reset_document_for_reparse` in `document_api_service.py` to
catch the new `LookupError` and return a proper "Document not found!"
API error instead of propagating the exception.

No schema changes, no API contract changes for the success path; only
the failure mode for a missing document during reparse is now a clean
error response instead of an uncaught exception.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
dale053
2026-05-13 23:48:52 -07:00
committed by GitHub
parent 3c68ad03be
commit bd99a22661
2 changed files with 83 additions and 23 deletions

View File

@@ -122,13 +122,16 @@ def reset_document_for_reparse(doc, tenant_id, parser_id=None, pipeline_id=None)
# Delete chunks from document store
if doc.token_num > 0:
e = DocumentService.increment_chunk_num(
doc.id,
doc.kb_id,
doc.token_num * -1,
doc.chunk_num * -1,
doc.process_duration * -1,
)
try:
e = DocumentService.increment_chunk_num(
doc.id,
doc.kb_id,
doc.token_num * -1,
doc.chunk_num * -1,
doc.process_duration * -1,
)
except LookupError:
return get_error_data_result(message="Document not found!")
if not e:
return get_error_data_result(message="Document not found!")
settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id)

View File

@@ -591,27 +591,84 @@ class DocumentService(CommonService):
@classmethod
@DB.connection_context()
def increment_chunk_num(cls, doc_id, kb_id, token_num, chunk_num, duration):
num = (
cls.model.update(token_num=cls.model.token_num + token_num, chunk_num=cls.model.chunk_num + chunk_num, process_duration=cls.model.process_duration + duration)
.where(cls.model.id == doc_id)
.execute()
)
if num == 0:
logging.warning("Document not found which is supposed to be there")
num = Knowledgebase.update(token_num=Knowledgebase.token_num + token_num, chunk_num=Knowledgebase.chunk_num + chunk_num).where(Knowledgebase.id == kb_id).execute()
"""Atomically add chunk/token counters on the document and its knowledge base."""
with DB.atomic():
num = (
cls.model.update(
token_num=cls.model.token_num + token_num,
chunk_num=cls.model.chunk_num + chunk_num,
process_duration=cls.model.process_duration + duration,
)
.where((cls.model.id == doc_id) & (cls.model.kb_id == kb_id))
.execute()
)
if num == 0:
logging.error(
"increment_chunk_num: no document matched doc_id=%s kb_id=%s "
"token_num=%s chunk_num=%s duration=%s",
doc_id,
kb_id,
token_num,
chunk_num,
duration,
)
raise LookupError("Document not found which is supposed to be there")
num = (
Knowledgebase.update(
token_num=Knowledgebase.token_num + token_num,
chunk_num=Knowledgebase.chunk_num + chunk_num,
)
.where(Knowledgebase.id == kb_id)
.execute()
)
if num == 0:
logging.error(
"increment_chunk_num: no knowledgebase matched kb_id=%s for doc_id=%s "
"token_num=%s chunk_num=%s duration=%s",
kb_id,
doc_id,
token_num,
chunk_num,
duration,
)
raise LookupError("Knowledgebase not found which is supposed to be there")
return num
@classmethod
@DB.connection_context()
def decrement_chunk_num(cls, doc_id, kb_id, token_num, chunk_num, duration):
num = (
cls.model.update(token_num=cls.model.token_num - token_num, chunk_num=cls.model.chunk_num - chunk_num, process_duration=cls.model.process_duration + duration)
.where(cls.model.id == doc_id)
.execute()
)
if num == 0:
raise LookupError("Document not found which is supposed to be there")
num = Knowledgebase.update(token_num=Knowledgebase.token_num - token_num, chunk_num=Knowledgebase.chunk_num - chunk_num).where(Knowledgebase.id == kb_id).execute()
"""Atomically subtract chunk/token counters on the document and its knowledge base."""
with DB.atomic():
num = (
cls.model.update(
token_num=cls.model.token_num - token_num,
chunk_num=cls.model.chunk_num - chunk_num,
process_duration=cls.model.process_duration + duration,
)
.where((cls.model.id == doc_id) & (cls.model.kb_id == kb_id))
.execute()
)
if num == 0:
raise LookupError("Document not found which is supposed to be there")
num = (
Knowledgebase.update(
token_num=Knowledgebase.token_num - token_num,
chunk_num=Knowledgebase.chunk_num - chunk_num,
)
.where(Knowledgebase.id == kb_id)
.execute()
)
if num == 0:
logging.error(
"decrement_chunk_num: no knowledgebase matched kb_id=%s for doc_id=%s "
"token_num=%s chunk_num=%s duration=%s",
kb_id,
doc_id,
token_num,
chunk_num,
duration,
)
raise LookupError("Knowledgebase not found which is supposed to be there")
return num
@classmethod