fix(api): use dataset-owner tenant for legacy /chunks docstore cleanup (#15961)

This commit is contained in:
kpdev
2026-06-23 23:24:40 -07:00
committed by GitHub
parent ede46e0bb8
commit 68d2ca0ff1
2 changed files with 83 additions and 8 deletions

View File

@@ -162,6 +162,9 @@ def _enrich_chunks_with_document_metadata(chunks: list[dict], metadata_fields=No
async def parse(tenant_id, dataset_id):
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
dataset_tenant_id = _get_dataset_tenant_id(dataset_id)
if not dataset_tenant_id:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
req = await get_request_json()
if not req.get("document_ids"):
return get_error_data_result("`document_ids` is required")
@@ -190,15 +193,15 @@ async def parse(tenant_id, dataset_id):
== 0
):
return get_error_data_result("Can't parse document that is currently being processed")
index_name = search.index_name(tenant_id)
if settings.docStoreConn.index_exist(index_name, dataset_id):
settings.docStoreConn.delete({"doc_id": id}, index_name, dataset_id)
index_name = search.index_name(dataset_tenant_id)
if settings.docStoreConn.index_exist(index_name, doc[0].kb_id):
settings.docStoreConn.delete({"doc_id": id}, index_name, doc[0].kb_id)
else:
logging.info(
"Skipping chunk delete during parse for doc %s: index %s/%s does not exist",
id,
index_name,
dataset_id,
doc[0].kb_id,
)
TaskService.filter_delete([Task.doc_id == id])
e, doc = DocumentService.get_by_id(id)
@@ -227,6 +230,9 @@ async def parse(tenant_id, dataset_id):
async def stop_parsing(tenant_id, dataset_id):
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
dataset_tenant_id = _get_dataset_tenant_id(dataset_id)
if not dataset_tenant_id:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
req = await get_request_json()
if not req.get("document_ids"):
@@ -249,15 +255,15 @@ async def stop_parsing(tenant_id, dataset_id):
cancel_all_task_of(id)
info = {"run": "2", "progress": 0, "chunk_num": 0}
DocumentService.update_by_id(id, info)
index_name = search.index_name(tenant_id)
if settings.docStoreConn.index_exist(index_name, dataset_id):
settings.docStoreConn.delete({"doc_id": doc[0].id}, index_name, dataset_id)
index_name = search.index_name(dataset_tenant_id)
if settings.docStoreConn.index_exist(index_name, doc[0].kb_id):
settings.docStoreConn.delete({"doc_id": doc[0].id}, index_name, doc[0].kb_id)
else:
logging.info(
"Skipping chunk delete during stop_parsing for doc %s: index %s/%s does not exist",
doc[0].id,
index_name,
dataset_id,
doc[0].kb_id,
)
success_count += 1
if duplicate_messages:

View File

@@ -731,6 +731,75 @@ class TestDocRoutesUnit:
res = _run(module.stop_parsing.__wrapped__("tenant-1", "ds-1"))
assert res["code"] == 0
def test_legacy_chunks_parse_uses_dataset_owner_tenant_for_delete(self, monkeypatch):
module = _load_doc_module(monkeypatch)
deleted = []
requester_tenant = "team-member"
owner_tenant = "dataset-owner"
monkeypatch.setattr(module.KnowledgebaseService, "accessible", lambda **_kwargs: True)
monkeypatch.setattr(
module.KnowledgebaseService,
"get_by_id",
lambda _id: (True, SimpleNamespace(tenant_id=owner_tenant)),
)
monkeypatch.setattr(module, "get_request_json", lambda: _AwaitableValue({"document_ids": ["doc-1"]}))
monkeypatch.setattr(module, "check_duplicate_ids", lambda ids, _kind: (ids, []))
monkeypatch.setattr(
module.DocumentService,
"query",
lambda **_kwargs: [_DummyDoc(doc_id="doc-1", run=module.TaskStatus.UNSTART.value)],
)
monkeypatch.setattr(module.DocumentService, "filter_update", lambda *_args, **_kwargs: 1)
monkeypatch.setattr(module.DocumentService, "get_by_id", lambda _id: (True, _DummyDoc(doc_id="doc-1")))
monkeypatch.setattr(module.File2DocumentService, "get_storage_address", lambda **_kwargs: ("b", "n"))
monkeypatch.setattr(module.TaskService, "filter_delete", lambda *_args, **_kwargs: None)
monkeypatch.setattr(module, "queue_tasks", lambda *_args, **_kwargs: None)
_patch_docstore(
monkeypatch,
module,
index_exist=lambda *_args, **_kwargs: True,
delete=lambda condition, index, kb_id: deleted.append((condition, index, kb_id)),
)
res = _run(module.parse.__wrapped__(requester_tenant, "ds-1"))
assert res["code"] == 0
assert deleted == [({"doc_id": "doc-1"}, module.search.index_name(owner_tenant), "kb-1")]
def test_legacy_chunks_stop_uses_dataset_owner_tenant_for_delete(self, monkeypatch):
module = _load_doc_module(monkeypatch)
deleted = []
requester_tenant = "team-member"
owner_tenant = "dataset-owner"
monkeypatch.setattr(module.KnowledgebaseService, "accessible", lambda **_kwargs: True)
monkeypatch.setattr(
module.KnowledgebaseService,
"get_by_id",
lambda _id: (True, SimpleNamespace(tenant_id=owner_tenant)),
)
monkeypatch.setattr(module, "get_request_json", lambda: _AwaitableValue({"document_ids": ["doc-1"]}))
monkeypatch.setattr(module, "check_duplicate_ids", lambda ids, _kind: (ids, []))
monkeypatch.setattr(
module.DocumentService,
"query",
lambda **_kwargs: [_DummyDoc(doc_id="doc-1", run=module.TaskStatus.RUNNING.value)],
)
monkeypatch.setattr(module, "cancel_all_task_of", lambda *_args, **_kwargs: None)
monkeypatch.setattr(module.DocumentService, "update_by_id", lambda *_args, **_kwargs: True)
_patch_docstore(
monkeypatch,
module,
index_exist=lambda *_args, **_kwargs: True,
delete=lambda condition, index, kb_id: deleted.append((condition, index, kb_id)),
)
res = _run(module.stop_parsing.__wrapped__(requester_tenant, "ds-1"))
assert res["code"] == 0
assert deleted == [({"doc_id": "doc-1"}, module.search.index_name(owner_tenant), "kb-1")]
def test_stop_parse_documents_cleans_partial_chunks(self, monkeypatch):
module = _load_doc_module(monkeypatch, module_basename="document_api")
updated = []