From 68d2ca0ff195ececbc92718200dc76b2f36a84cb Mon Sep 17 00:00:00 2001 From: kpdev <156195510+kiannidev@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:24:40 -0700 Subject: [PATCH] fix(api): use dataset-owner tenant for legacy /chunks docstore cleanup (#15961) --- api/apps/restful_apis/chunk_api.py | 22 +++--- .../test_doc_sdk_routes_unit.py | 69 +++++++++++++++++++ 2 files changed, 83 insertions(+), 8 deletions(-) diff --git a/api/apps/restful_apis/chunk_api.py b/api/apps/restful_apis/chunk_api.py index 47c3c9dac9..eae8ae2f41 100644 --- a/api/apps/restful_apis/chunk_api.py +++ b/api/apps/restful_apis/chunk_api.py @@ -162,6 +162,9 @@ def _enrich_chunks_with_document_metadata(chunks: list[dict], metadata_fields=No async def parse(tenant_id, dataset_id): if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") + dataset_tenant_id = _get_dataset_tenant_id(dataset_id) + if not dataset_tenant_id: + return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") req = await get_request_json() if not req.get("document_ids"): return get_error_data_result("`document_ids` is required") @@ -190,15 +193,15 @@ async def parse(tenant_id, dataset_id): == 0 ): return get_error_data_result("Can't parse document that is currently being processed") - index_name = search.index_name(tenant_id) - if settings.docStoreConn.index_exist(index_name, dataset_id): - settings.docStoreConn.delete({"doc_id": id}, index_name, dataset_id) + index_name = search.index_name(dataset_tenant_id) + if settings.docStoreConn.index_exist(index_name, doc[0].kb_id): + settings.docStoreConn.delete({"doc_id": id}, index_name, doc[0].kb_id) else: logging.info( "Skipping chunk delete during parse for doc %s: index %s/%s does not exist", id, index_name, - dataset_id, + doc[0].kb_id, ) TaskService.filter_delete([Task.doc_id == id]) e, doc = DocumentService.get_by_id(id) @@ -227,6 +230,9 @@ async def parse(tenant_id, dataset_id): async def stop_parsing(tenant_id, dataset_id): if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") + dataset_tenant_id = _get_dataset_tenant_id(dataset_id) + if not dataset_tenant_id: + return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") req = await get_request_json() if not req.get("document_ids"): @@ -249,15 +255,15 @@ async def stop_parsing(tenant_id, dataset_id): cancel_all_task_of(id) info = {"run": "2", "progress": 0, "chunk_num": 0} DocumentService.update_by_id(id, info) - index_name = search.index_name(tenant_id) - if settings.docStoreConn.index_exist(index_name, dataset_id): - settings.docStoreConn.delete({"doc_id": doc[0].id}, index_name, dataset_id) + index_name = search.index_name(dataset_tenant_id) + if settings.docStoreConn.index_exist(index_name, doc[0].kb_id): + settings.docStoreConn.delete({"doc_id": doc[0].id}, index_name, doc[0].kb_id) else: logging.info( "Skipping chunk delete during stop_parsing for doc %s: index %s/%s does not exist", doc[0].id, index_name, - dataset_id, + doc[0].kb_id, ) success_count += 1 if duplicate_messages: diff --git a/test/testcases/test_http_api/test_file_management_within_dataset/test_doc_sdk_routes_unit.py b/test/testcases/test_http_api/test_file_management_within_dataset/test_doc_sdk_routes_unit.py index 2131aa533c..663735b14e 100644 --- a/test/testcases/test_http_api/test_file_management_within_dataset/test_doc_sdk_routes_unit.py +++ b/test/testcases/test_http_api/test_file_management_within_dataset/test_doc_sdk_routes_unit.py @@ -731,6 +731,75 @@ class TestDocRoutesUnit: res = _run(module.stop_parsing.__wrapped__("tenant-1", "ds-1")) assert res["code"] == 0 + def test_legacy_chunks_parse_uses_dataset_owner_tenant_for_delete(self, monkeypatch): + module = _load_doc_module(monkeypatch) + deleted = [] + requester_tenant = "team-member" + owner_tenant = "dataset-owner" + + monkeypatch.setattr(module.KnowledgebaseService, "accessible", lambda **_kwargs: True) + monkeypatch.setattr( + module.KnowledgebaseService, + "get_by_id", + lambda _id: (True, SimpleNamespace(tenant_id=owner_tenant)), + ) + monkeypatch.setattr(module, "get_request_json", lambda: _AwaitableValue({"document_ids": ["doc-1"]})) + monkeypatch.setattr(module, "check_duplicate_ids", lambda ids, _kind: (ids, [])) + monkeypatch.setattr( + module.DocumentService, + "query", + lambda **_kwargs: [_DummyDoc(doc_id="doc-1", run=module.TaskStatus.UNSTART.value)], + ) + monkeypatch.setattr(module.DocumentService, "filter_update", lambda *_args, **_kwargs: 1) + monkeypatch.setattr(module.DocumentService, "get_by_id", lambda _id: (True, _DummyDoc(doc_id="doc-1"))) + monkeypatch.setattr(module.File2DocumentService, "get_storage_address", lambda **_kwargs: ("b", "n")) + monkeypatch.setattr(module.TaskService, "filter_delete", lambda *_args, **_kwargs: None) + monkeypatch.setattr(module, "queue_tasks", lambda *_args, **_kwargs: None) + _patch_docstore( + monkeypatch, + module, + index_exist=lambda *_args, **_kwargs: True, + delete=lambda condition, index, kb_id: deleted.append((condition, index, kb_id)), + ) + + res = _run(module.parse.__wrapped__(requester_tenant, "ds-1")) + + assert res["code"] == 0 + assert deleted == [({"doc_id": "doc-1"}, module.search.index_name(owner_tenant), "kb-1")] + + def test_legacy_chunks_stop_uses_dataset_owner_tenant_for_delete(self, monkeypatch): + module = _load_doc_module(monkeypatch) + deleted = [] + requester_tenant = "team-member" + owner_tenant = "dataset-owner" + + monkeypatch.setattr(module.KnowledgebaseService, "accessible", lambda **_kwargs: True) + monkeypatch.setattr( + module.KnowledgebaseService, + "get_by_id", + lambda _id: (True, SimpleNamespace(tenant_id=owner_tenant)), + ) + monkeypatch.setattr(module, "get_request_json", lambda: _AwaitableValue({"document_ids": ["doc-1"]})) + monkeypatch.setattr(module, "check_duplicate_ids", lambda ids, _kind: (ids, [])) + monkeypatch.setattr( + module.DocumentService, + "query", + lambda **_kwargs: [_DummyDoc(doc_id="doc-1", run=module.TaskStatus.RUNNING.value)], + ) + monkeypatch.setattr(module, "cancel_all_task_of", lambda *_args, **_kwargs: None) + monkeypatch.setattr(module.DocumentService, "update_by_id", lambda *_args, **_kwargs: True) + _patch_docstore( + monkeypatch, + module, + index_exist=lambda *_args, **_kwargs: True, + delete=lambda condition, index, kb_id: deleted.append((condition, index, kb_id)), + ) + + res = _run(module.stop_parsing.__wrapped__(requester_tenant, "ds-1")) + + assert res["code"] == 0 + assert deleted == [({"doc_id": "doc-1"}, module.search.index_name(owner_tenant), "kb-1")] + def test_stop_parse_documents_cleans_partial_chunks(self, monkeypatch): module = _load_doc_module(monkeypatch, module_basename="document_api") updated = []