diff --git a/api/apps/services/document_api_service.py b/api/apps/services/document_api_service.py index 59abbd2507..63c71ff4a2 100644 --- a/api/apps/services/document_api_service.py +++ b/api/apps/services/document_api_service.py @@ -122,13 +122,16 @@ def reset_document_for_reparse(doc, tenant_id, parser_id=None, pipeline_id=None) # Delete chunks from document store if doc.token_num > 0: - e = DocumentService.increment_chunk_num( - doc.id, - doc.kb_id, - doc.token_num * -1, - doc.chunk_num * -1, - doc.process_duration * -1, - ) + try: + e = DocumentService.increment_chunk_num( + doc.id, + doc.kb_id, + doc.token_num * -1, + doc.chunk_num * -1, + doc.process_duration * -1, + ) + except LookupError: + return get_error_data_result(message="Document not found!") if not e: return get_error_data_result(message="Document not found!") settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id) diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 44c9995c02..3d9bc09dbb 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -591,27 +591,84 @@ class DocumentService(CommonService): @classmethod @DB.connection_context() def increment_chunk_num(cls, doc_id, kb_id, token_num, chunk_num, duration): - num = ( - cls.model.update(token_num=cls.model.token_num + token_num, chunk_num=cls.model.chunk_num + chunk_num, process_duration=cls.model.process_duration + duration) - .where(cls.model.id == doc_id) - .execute() - ) - if num == 0: - logging.warning("Document not found which is supposed to be there") - num = Knowledgebase.update(token_num=Knowledgebase.token_num + token_num, chunk_num=Knowledgebase.chunk_num + chunk_num).where(Knowledgebase.id == kb_id).execute() + """Atomically add chunk/token counters on the document and its knowledge base.""" + with DB.atomic(): + num = ( + cls.model.update( + token_num=cls.model.token_num + token_num, + chunk_num=cls.model.chunk_num + chunk_num, + process_duration=cls.model.process_duration + duration, + ) + .where((cls.model.id == doc_id) & (cls.model.kb_id == kb_id)) + .execute() + ) + if num == 0: + logging.error( + "increment_chunk_num: no document matched doc_id=%s kb_id=%s " + "token_num=%s chunk_num=%s duration=%s", + doc_id, + kb_id, + token_num, + chunk_num, + duration, + ) + raise LookupError("Document not found which is supposed to be there") + num = ( + Knowledgebase.update( + token_num=Knowledgebase.token_num + token_num, + chunk_num=Knowledgebase.chunk_num + chunk_num, + ) + .where(Knowledgebase.id == kb_id) + .execute() + ) + if num == 0: + logging.error( + "increment_chunk_num: no knowledgebase matched kb_id=%s for doc_id=%s " + "token_num=%s chunk_num=%s duration=%s", + kb_id, + doc_id, + token_num, + chunk_num, + duration, + ) + raise LookupError("Knowledgebase not found which is supposed to be there") return num @classmethod @DB.connection_context() def decrement_chunk_num(cls, doc_id, kb_id, token_num, chunk_num, duration): - num = ( - cls.model.update(token_num=cls.model.token_num - token_num, chunk_num=cls.model.chunk_num - chunk_num, process_duration=cls.model.process_duration + duration) - .where(cls.model.id == doc_id) - .execute() - ) - if num == 0: - raise LookupError("Document not found which is supposed to be there") - num = Knowledgebase.update(token_num=Knowledgebase.token_num - token_num, chunk_num=Knowledgebase.chunk_num - chunk_num).where(Knowledgebase.id == kb_id).execute() + """Atomically subtract chunk/token counters on the document and its knowledge base.""" + with DB.atomic(): + num = ( + cls.model.update( + token_num=cls.model.token_num - token_num, + chunk_num=cls.model.chunk_num - chunk_num, + process_duration=cls.model.process_duration + duration, + ) + .where((cls.model.id == doc_id) & (cls.model.kb_id == kb_id)) + .execute() + ) + if num == 0: + raise LookupError("Document not found which is supposed to be there") + num = ( + Knowledgebase.update( + token_num=Knowledgebase.token_num - token_num, + chunk_num=Knowledgebase.chunk_num - chunk_num, + ) + .where(Knowledgebase.id == kb_id) + .execute() + ) + if num == 0: + logging.error( + "decrement_chunk_num: no knowledgebase matched kb_id=%s for doc_id=%s " + "token_num=%s chunk_num=%s duration=%s", + kb_id, + doc_id, + token_num, + chunk_num, + duration, + ) + raise LookupError("Knowledgebase not found which is supposed to be there") return num @classmethod