From 488c3ef6a306cf11f73dd642c0e7fd0420c4001e Mon Sep 17 00:00:00 2001 From: Wang Qi Date: Mon, 27 Apr 2026 19:16:37 +0800 Subject: [PATCH] Add task API (#14393) ### What problem does this PR solve? Add task API ### Type of change - [x] Refactor --- api/apps/canvas_app.py | 29 -------- api/apps/restful_apis/task_api.py | 117 ++++++++++++++++++++++++++++++ web/src/services/agent-service.ts | 4 +- web/src/utils/api.ts | 4 +- 4 files changed, 121 insertions(+), 33 deletions(-) delete mode 100644 api/apps/canvas_app.py create mode 100644 api/apps/restful_apis/task_api.py diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py deleted file mode 100644 index 811d9870f9..0000000000 --- a/api/apps/canvas_app.py +++ /dev/null @@ -1,29 +0,0 @@ -# -# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import logging -from api.utils.api_utils import get_json_result -from rag.utils.redis_conn import REDIS_CONN -from api.apps import login_required - - -@manager.route('/cancel/', methods=['PUT']) # noqa: F821 -@login_required -def cancel(task_id): - try: - REDIS_CONN.set(f"{task_id}-cancel", "x") - except Exception as e: - logging.exception(e) - return get_json_result(data=True) diff --git a/api/apps/restful_apis/task_api.py b/api/apps/restful_apis/task_api.py new file mode 100644 index 0000000000..69ff7dd405 --- /dev/null +++ b/api/apps/restful_apis/task_api.py @@ -0,0 +1,117 @@ +# +# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +from datetime import datetime + +from api.apps import login_required +from api.db.services.task_service import TaskService, CANVAS_DEBUG_DOC_ID, GRAPH_RAPTOR_FAKE_DOC_ID +from api.utils.api_utils import ( + get_data_error_result, + get_json_result, + get_request_json, + validate_request, +) +from common.constants import RetCode, TaskStatus +from rag.utils.redis_conn import REDIS_CONN + + +@manager.route("/tasks//cancel", methods=["POST"]) # noqa: F821 +@login_required +async def cancel_task(task_id): + """Cancel a running task. + """ + return await _cancel_task(task_id) + + +@manager.route("/tasks/", methods=["PATCH"]) # noqa: F821 +@login_required +@validate_request("action") +async def patch_task(task_id): + req = await get_request_json() + action = req.get("action") + + if action != "stop": + return get_json_result( + code=RetCode.ARGUMENT_ERROR, + message=f"Invalid action '{action}'. Only 'stop' is supported.", + ) + + return await _cancel_task(task_id) + + +async def _cancel_task(task_id): + """ + Sets a Redis cancel flag, updates the task progress to -1 (cancelled), + and marks the associated document's run status as CANCEL if applicable. + """ + exists, task = TaskService.get_by_id(task_id) + if not exists: + return get_data_error_result( + code=RetCode.NOT_FOUND, + message=f"Task '{task_id}' not found.", + ) + + # A task is stoppable if it hasn't completed (progress < 1) and isn't already + # in a failed/cancelled state (progress >= 0). progress == -1 means the task + # previously failed or was cancelled. + if task.progress < 0: + return get_data_error_result( + message="Task is already in a cancelled or failed state.", + ) + if task.progress >= 1: + return get_data_error_result( + message="Task has already completed and cannot be stopped.", + ) + + try: + REDIS_CONN.set(f"{task_id}-cancel", "x") + except Exception as e: + logging.exception("Failed to set cancel flag for task %s: %s", task_id, str(e)) + return get_json_result( + code=RetCode.CONNECTION_ERROR, + message="Failed to stop task", + ) + + # Append a cancellation message so the user can see it in progress_msg. + try: + cancel_msg = f"\n{datetime.now().strftime('%H:%M:%S')} Task stopped by user." + # Only transition to -1 if the task is still in a non-terminal state, + # mirroring TaskService.update_progress semantics. + TaskService.model.update( + progress_msg=TaskService.model.progress_msg + cancel_msg, + progress=-1, + ).where( + (TaskService.model.id == task_id) + & (TaskService.model.progress >= 0) + & (TaskService.model.progress < 1) + ).execute() + except Exception as e: + logging.warning("Failed to update task %s progress after cancellation: %s", task_id, str(e)) + + # If the task belongs to a document, also mark the document's run status as + # cancelled so that the UI reflects the state correctly. + try: + from api.db.services.document_service import DocumentService + doc_id = task.doc_id + if doc_id and doc_id not in (CANVAS_DEBUG_DOC_ID, GRAPH_RAPTOR_FAKE_DOC_ID): + _, doc = DocumentService.get_by_id(doc_id) + if doc and str(doc.run) in (TaskStatus.RUNNING.value, TaskStatus.SCHEDULE.value): + DocumentService.update_by_id(doc_id, {"run": TaskStatus.CANCEL.value, "progress": 0}) + except Exception as e: + logging.warning("Failed to update document run status for task %s: %s", task_id, str(e)) + + logging.info(f"Cancel task succeeded: task_id={task_id} doc_id={task.doc_id}") + return get_json_result(data=True) diff --git a/web/src/services/agent-service.ts b/web/src/services/agent-service.ts index 0c43b93983..4a4f59daaf 100644 --- a/web/src/services/agent-service.ts +++ b/web/src/services/agent-service.ts @@ -107,11 +107,11 @@ const methods = { }, cancelDataflow: { url: cancelDataflow, - method: 'put', + method: 'post', }, cancelCanvas: { url: cancelCanvas, - method: 'put', + method: 'post', }, createAgentSession: { url: api.createAgentSession, diff --git a/web/src/utils/api.ts b/web/src/utils/api.ts index e1fde6fd5f..18af8ea2db 100644 --- a/web/src/utils/api.ts +++ b/web/src/utils/api.ts @@ -196,7 +196,7 @@ export default { `${restAPIv1}/agents/${agentId}/components/${componentId}/debug`, trace: (agentId: string, messageId: string) => `${restAPIv1}/agents/${agentId}/logs/${messageId}`, - cancelCanvas: (taskId: string) => `${webAPI}/canvas/cancel/${taskId}`, // cancel conversation + cancelCanvas: (taskId: string) => `${restAPIv1}/tasks/${taskId}/cancel`, // agent inputForm: (agentId: string, componentId: string) => `${restAPIv1}/agents/${agentId}/components/${componentId}/input-form`, @@ -215,7 +215,7 @@ export default { fetchExternalAgentInputs: (canvasId: string) => `${restAPIv1}/agentbots/${canvasId}/inputs`, prompt: `${restAPIv1}/agents/prompts`, - cancelDataflow: (id: string) => `${webAPI}/canvas/cancel/${id}`, + cancelDataflow: (id: string) => `${restAPIv1}/tasks/${id}/cancel`, downloadFile: `${restAPIv1}/agents/download`, testWebhook: (id: string) => `${restAPIv1}/agents/${id}/webhook/test`, fetchWebhookTrace: (id: string) => `${restAPIv1}/agents/${id}/webhook/logs`,