diff --git a/api/apps/file2document_app.py b/api/apps/file2document_app.py index 745988a97c..c82207ab73 100644 --- a/api/apps/file2document_app.py +++ b/api/apps/file2document_app.py @@ -14,6 +14,8 @@ # limitations under the License # +import asyncio +import logging from pathlib import Path from api.db.services.file2document_service import File2DocumentService @@ -28,6 +30,50 @@ from api.db import FileType from api.db.services.document_service import DocumentService +def _convert_files(file_ids, kb_ids, user_id): + """Synchronous worker: delete old docs and insert new ones for the given file/kb pairs.""" + for id in file_ids: + informs = File2DocumentService.get_by_file_id(id) + for inform in informs: + doc_id = inform.document_id + e, doc = DocumentService.get_by_id(doc_id) + if not e: + continue + tenant_id = DocumentService.get_tenant_id(doc_id) + if not tenant_id: + logging.warning("tenant_id not found for doc_id=%s, skipping remove_document", doc_id) + continue + DocumentService.remove_document(doc, tenant_id) + File2DocumentService.delete_by_file_id(id) + + e, file = FileService.get_by_id(id) + if not e: + continue + + for kb_id in kb_ids: + e, kb = KnowledgebaseService.get_by_id(kb_id) + if not e: + continue + doc = DocumentService.insert({ + "id": get_uuid(), + "kb_id": kb.id, + "parser_id": FileService.get_parser(file.type, file.name, kb.parser_id), + "pipeline_id": kb.pipeline_id, + "parser_config": kb.parser_config, + "created_by": user_id, + "type": file.type, + "name": file.name, + "suffix": Path(file.name).suffix.lstrip("."), + "location": file.location, + "size": file.size + }) + File2DocumentService.insert({ + "id": get_uuid(), + "file_id": id, + "document_id": doc.id, + }) + + @manager.route('/convert', methods=['POST']) # noqa: F821 @login_required @validate_request("file_ids", "kb_ids") @@ -35,66 +81,41 @@ async def convert(): req = await get_request_json() kb_ids = req["kb_ids"] file_ids = req["file_ids"] - file2documents = [] try: files = FileService.get_by_ids(file_ids) - files_set = dict({file.id: file for file in files}) + files_set = {file.id: file for file in files} + + # Validate all files exist before starting any work + for file_id in file_ids: + if not files_set.get(file_id): + return get_data_error_result(message="File not found!") + + # Validate all kb_ids exist before scheduling background work + for kb_id in kb_ids: + e, _ = KnowledgebaseService.get_by_id(kb_id) + if not e: + return get_data_error_result(message="Can't find this dataset!") + + # Expand folders to their innermost file IDs + all_file_ids = [] for file_id in file_ids: file = files_set[file_id] - if not file: - return get_data_error_result(message="File not found!") - file_ids_list = [file_id] if file.type == FileType.FOLDER.value: - file_ids_list = FileService.get_all_innermost_file_ids(file_id, []) - for id in file_ids_list: - informs = File2DocumentService.get_by_file_id(id) - # delete - for inform in informs: - doc_id = inform.document_id - e, doc = DocumentService.get_by_id(doc_id) - if not e: - return get_data_error_result(message="Document not found!") - tenant_id = DocumentService.get_tenant_id(doc_id) - if not tenant_id: - return get_data_error_result(message="Tenant not found!") - if not DocumentService.remove_document(doc, tenant_id): - return get_data_error_result( - message="Database error (Document removal)!") - File2DocumentService.delete_by_file_id(id) + all_file_ids.extend(FileService.get_all_innermost_file_ids(file_id, [])) + else: + all_file_ids.append(file_id) - # insert - for kb_id in kb_ids: - e, kb = KnowledgebaseService.get_by_id(kb_id) - if not e: - return get_data_error_result( - message="Can't find this dataset!") - e, file = FileService.get_by_id(id) - if not e: - return get_data_error_result( - message="Can't find this file!") - - doc = DocumentService.insert({ - "id": get_uuid(), - "kb_id": kb.id, - "parser_id": FileService.get_parser(file.type, file.name, kb.parser_id), - "pipeline_id": kb.pipeline_id, - "parser_config": kb.parser_config, - "created_by": current_user.id, - "type": file.type, - "name": file.name, - "suffix": Path(file.name).suffix.lstrip("."), - "location": file.location, - "size": file.size - }) - file2document = File2DocumentService.insert({ - "id": get_uuid(), - "file_id": id, - "document_id": doc.id, - }) - - file2documents.append(file2document.to_json()) - return get_json_result(data=file2documents) + user_id = current_user.id + # Run the blocking DB work in a thread so the event loop is not blocked. + # For large folders this prevents 504 Gateway Timeout by returning as + # soon as the background task is scheduled. + loop = asyncio.get_running_loop() + future = loop.run_in_executor(None, _convert_files, all_file_ids, kb_ids, user_id) + future.add_done_callback( + lambda f: logging.error("_convert_files failed: %s", f.exception()) if f.exception() else None + ) + return get_json_result(data=True) except Exception as e: return server_error_response(e) diff --git a/api/utils/tenant_utils.py b/api/utils/tenant_utils.py index cf362e3c23..83da91f1c4 100644 --- a/api/utils/tenant_utils.py +++ b/api/utils/tenant_utils.py @@ -13,12 +13,23 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from common.constants import LLMType from api.db.services.tenant_llm_service import TenantLLMService +_KEY_TO_MODEL_TYPE = { + "llm_id": LLMType.CHAT, + "embd_id": LLMType.EMBEDDING, + "asr_id": LLMType.SPEECH2TEXT, + "img2txt_id": LLMType.IMAGE2TEXT, + "rerank_id": LLMType.RERANK, + "tts_id": LLMType.TTS, +} + def ensure_tenant_model_id_for_params(tenant_id: str, param_dict: dict) -> dict: for key in ["llm_id", "embd_id", "asr_id", "img2txt_id", "rerank_id", "tts_id"]: if param_dict.get(key) and not param_dict.get(f"tenant_{key}"): - tenant_model = TenantLLMService.get_api_key(tenant_id, param_dict[key]) + model_type = _KEY_TO_MODEL_TYPE.get(key) + tenant_model = TenantLLMService.get_api_key(tenant_id, param_dict[key], model_type) if tenant_model: param_dict.update({f"tenant_{key}": tenant_model.id}) else: diff --git a/test/testcases/test_web_api/test_dialog_app/test_dialog_routes_unit.py b/test/testcases/test_web_api/test_dialog_app/test_dialog_routes_unit.py index b5b9e1ab73..b3007a9e03 100644 --- a/test/testcases/test_web_api/test_dialog_app/test_dialog_routes_unit.py +++ b/test/testcases/test_web_api/test_dialog_app/test_dialog_routes_unit.py @@ -151,7 +151,7 @@ def _load_dialog_module(monkeypatch): return embd_id.split("@") @staticmethod - def get_api_key(tenant_id, model_name): + def get_api_key(tenant_id, model_name, model_type=None): return _MockTableObject( id=1, tenant_id=tenant_id, diff --git a/test/testcases/test_web_api/test_file_app/test_file2document_routes_unit.py b/test/testcases/test_web_api/test_file_app/test_file2document_routes_unit.py index fdcbc59f08..a81414829c 100644 --- a/test/testcases/test_web_api/test_file_app/test_file2document_routes_unit.py +++ b/test/testcases/test_web_api/test_file_app/test_file2document_routes_unit.py @@ -244,69 +244,33 @@ def test_convert_branch_matrix_unit(monkeypatch): req_state = {"kb_ids": ["kb-1"], "file_ids": ["f1"]} _set_request_json(monkeypatch, module, req_state) - events = {"deleted": []} - + # Falsy file → "File not found!" (synchronous validation) monkeypatch.setattr(module.FileService, "get_by_ids", lambda _ids: [_FalsyFile("f1", module.FileType.DOC.value)]) res = _run(module.convert()) assert res["message"] == "File not found!" + # Valid file but invalid kb → "Can't find this dataset!" (synchronous validation) + # KnowledgebaseService stub returns (False, None) by default monkeypatch.setattr(module.FileService, "get_by_ids", lambda _ids: [_DummyFile("f1", module.FileType.DOC.value)]) - monkeypatch.setattr(module.File2DocumentService, "get_by_file_id", lambda _file_id: [SimpleNamespace(document_id="doc-1")]) - monkeypatch.setattr(module.DocumentService, "get_by_id", lambda _doc_id: (False, None)) - res = _run(module.convert()) - assert res["message"] == "Document not found!" - - monkeypatch.setattr(module.DocumentService, "get_by_id", lambda _doc_id: (True, SimpleNamespace(id=_doc_id))) - monkeypatch.setattr(module.DocumentService, "get_tenant_id", lambda _doc_id: None) - res = _run(module.convert()) - assert res["message"] == "Tenant not found!" - - monkeypatch.setattr(module.DocumentService, "get_tenant_id", lambda _doc_id: "tenant-1") - monkeypatch.setattr(module.DocumentService, "remove_document", lambda *_args, **_kwargs: False) - res = _run(module.convert()) - assert "Document removal" in res["message"] - - monkeypatch.setattr(module.DocumentService, "remove_document", lambda *_args, **_kwargs: True) - monkeypatch.setattr(module.File2DocumentService, "get_by_file_id", lambda _file_id: []) - monkeypatch.setattr(module.File2DocumentService, "delete_by_file_id", lambda file_id: events["deleted"].append(file_id)) - monkeypatch.setattr(module.KnowledgebaseService, "get_by_id", lambda _kb_id: (False, None)) res = _run(module.convert()) assert res["message"] == "Can't find this dataset!" - assert events["deleted"] == ["f1"] + # Valid file and kb → schedules background work, returns data=True immediately kb = SimpleNamespace(id="kb-1", parser_id="naive", pipeline_id="p1", parser_config={}) monkeypatch.setattr(module.KnowledgebaseService, "get_by_id", lambda _kb_id: (True, kb)) - monkeypatch.setattr(module.FileService, "get_by_id", lambda _file_id: (False, None)) res = _run(module.convert()) - assert res["message"] == "Can't find this file!" + assert res["code"] == 0 + assert res["data"] is True + # Folder expansion → schedules background work, returns data=True immediately req_state["file_ids"] = ["folder-1"] monkeypatch.setattr(module.FileService, "get_by_ids", lambda _ids: [_DummyFile("folder-1", module.FileType.FOLDER.value, name="folder")]) monkeypatch.setattr(module.FileService, "get_all_innermost_file_ids", lambda _file_id, _acc: ["inner-1"]) - monkeypatch.setattr( - module.FileService, - "get_by_id", - lambda _file_id: (True, _DummyFile("inner-1", module.FileType.DOC.value, name="inner.txt", location="inner.loc", size=2)), - ) - inserted = {} - - def _insert(payload): - inserted.update(payload) - return SimpleNamespace(id="doc-new") - - monkeypatch.setattr(module.DocumentService, "insert", _insert) - monkeypatch.setattr(module.FileService, "get_parser", lambda _ft, _name, _parser_id: "picked-parser") - monkeypatch.setattr( - module.File2DocumentService, - "insert", - lambda _payload: SimpleNamespace(to_json=lambda: {"file_id": "inner-1", "document_id": "doc-new"}), - ) res = _run(module.convert()) assert res["code"] == 0 - assert res["data"] == [{"file_id": "inner-1", "document_id": "doc-new"}] - assert inserted["parser_id"] == "picked-parser" - assert inserted["pipeline_id"] == "p1" + assert res["data"] is True + # Exception in file lookup → 500 req_state["file_ids"] = ["f1"] monkeypatch.setattr( module.FileService, diff --git a/test/testcases/test_web_api/test_user_app/test_user_app_unit.py b/test/testcases/test_web_api/test_user_app/test_user_app_unit.py index 5cecafe235..85361cf2d3 100644 --- a/test/testcases/test_web_api/test_user_app/test_user_app_unit.py +++ b/test/testcases/test_web_api/test_user_app/test_user_app_unit.py @@ -230,7 +230,7 @@ def _load_user_app(monkeypatch): return True @staticmethod - def get_api_key(tenant_id, model_name): + def get_api_key(tenant_id, model_name, model_type=None): return _MockTableObject( id=1, tenant_id=tenant_id, diff --git a/web/src/components/shared-badge.tsx b/web/src/components/shared-badge.tsx index 05d4a893bb..03625ba675 100644 --- a/web/src/components/shared-badge.tsx +++ b/web/src/components/shared-badge.tsx @@ -8,5 +8,5 @@ export function SharedBadge({ children }: PropsWithChildren) { return null; } - return {children}; + return {children}; }