mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
Revert "fix: duplicate document ingest guard" (#15707)
Reverts infiniflow/ragflow#15638
This commit is contained in:
@@ -38,7 +38,7 @@ from api.db.services.file2document_service import File2DocumentService
|
||||
from api.db.services.file_service import FileService
|
||||
from api.db.services.knowledgebase_service import KnowledgebaseService
|
||||
from api.common.check_team_permission import check_kb_team_permission
|
||||
from api.db.services.task_service import TaskService, cancel_all_task_of, has_canceled
|
||||
from api.db.services.task_service import TaskService, cancel_all_task_of
|
||||
from api.utils.api_utils import construct_json_result, get_data_error_result, get_error_data_result, get_result, get_json_result, \
|
||||
server_error_response, add_tenant_id_to_kwargs, get_request_json, get_error_argument_result, check_duplicate_ids
|
||||
from api.utils.pagination_utils import validate_rest_api_page_size
|
||||
@@ -1397,30 +1397,17 @@ def _run_sync(user_id:str, req):
|
||||
if not e:
|
||||
return RetCode.DATA_ERROR, "Document not found!"
|
||||
|
||||
if str(req["run"]) == TaskStatus.RUNNING.value:
|
||||
tasks = list(TaskService.query(doc_id=doc_id))
|
||||
has_active_task = any((task.progress or 0) < 1 and not has_canceled(task.id) for task in tasks)
|
||||
if str(doc.run) in [TaskStatus.RUNNING.value, TaskStatus.SCHEDULE.value] or has_active_task:
|
||||
return RetCode.DATA_ERROR, "Document is already running"
|
||||
|
||||
should_cancel = False
|
||||
if str(req["run"]) == TaskStatus.CANCEL.value:
|
||||
tasks = list(TaskService.query(doc_id=doc_id))
|
||||
has_unfinished_task = any((task.progress or 0) < 1 for task in tasks)
|
||||
if str(doc.run) in [TaskStatus.RUNNING.value, TaskStatus.CANCEL.value] or has_unfinished_task:
|
||||
should_cancel = True
|
||||
cancel_all_task_of(doc_id)
|
||||
else:
|
||||
return RetCode.DATA_ERROR, "Cannot cancel a task that is not in RUNNING status"
|
||||
if all([rerun_with_delete, str(doc.run) == TaskStatus.DONE.value]):
|
||||
DocumentService.clear_chunk_num_when_rerun(doc_id)
|
||||
|
||||
affected_rows = DocumentService.update_by_id_if_update_time(doc_id, doc.update_time, info)
|
||||
if not affected_rows:
|
||||
return RetCode.DATA_ERROR, "Document is already running"
|
||||
|
||||
if str(req["run"]) == TaskStatus.CANCEL.value and should_cancel:
|
||||
cancel_all_task_of(doc_id)
|
||||
|
||||
DocumentService.update_by_id(doc_id, info)
|
||||
if req.get("delete", False):
|
||||
TaskService.filter_delete([Task.doc_id == doc_id])
|
||||
if settings.docStoreConn.index_exist(search.index_name(doc_tenant_id), doc.kb_id):
|
||||
|
||||
@@ -276,26 +276,6 @@ class CommonService:
|
||||
num = cls.model.update(data).where(cls.model.id == pid).execute()
|
||||
return num
|
||||
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
@retry_db_operation
|
||||
def update_by_id_if_update_time(cls, pid, update_time, data):
|
||||
# Update a single record by ID only if update_time matches the expected value.
|
||||
# Args:
|
||||
# pid: Record ID
|
||||
# update_time: Expected update_time value for optimistic locking
|
||||
# data: Updated field values
|
||||
# Returns:
|
||||
# Number of records updated
|
||||
data["update_time"] = current_timestamp()
|
||||
data["update_date"] = datetime_format(datetime.now())
|
||||
num = (
|
||||
cls.model.update(data)
|
||||
.where(cls.model.id == pid, cls.model.update_time == update_time)
|
||||
.execute()
|
||||
)
|
||||
return num
|
||||
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
def get_by_id(cls, pid):
|
||||
|
||||
Reference in New Issue
Block a user