Files
ragflow/api/apps/restful_apis/document_api.py

2125 lines
80 KiB
Python
Raw Permalink Normal View History

#
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from io import BytesIO
import logging
import json
import os
import re
from pathlib import Path
from quart import request, make_response,send_file
from peewee import OperationalError
from pydantic import ValidationError
fix(api): restore DocumentService.accessible check on /preview (#15508) ## Summary Restore the `DocumentService.accessible(doc_id, current_user.id)` check that PR #15146 dropped from the REST document preview handler. Any authenticated caller could download any tenant's document bytes by guessing/knowing the `doc_id`. ## Root cause `api/apps/restful_apis/document_api.py` — the `GET /documents/<doc_id>/preview` handler called `DocumentService.get_by_id` and went straight to `File2DocumentService.get_storage_address` + `STORAGE_IMPL.get`, with no tenant check between the lookup and the read. The handler's docstring even promises "user must belong to the tenant that owns the document's knowledge base" — the code didn't enforce it. ## Fix - Add `current_user` to the existing `api.apps` import. - Immediately after `get_by_id`, call `DocumentService.accessible(doc_id, current_user.id)`; on denial, return the **same** `get_data_error_result(message="Document not found!")` shape used for the missing-doc branch. That makes a cross-tenant probe indistinguishable from a missing-doc probe, preventing ID enumeration (the issue body calls this out explicitly). - Emit `logging.warning` with caller user + doc_id for audit. - Restores symmetry with peer routes that already call `accessible(doc_id, user_id)` (e.g. `_run_sync` at `document_api.py:1380`). ## Test plan Adds `test/unit_test/api/apps/restful_apis/test_document_preview_accessible.py`: - **`test_cross_tenant_preview_is_denied`** — owner tenant ≠ caller tenant; asserts the response shape is `Document not found!` and the storage backend (`thread_pool_exec(STORAGE_IMPL.get, ...)`) is **never** invoked. - **`test_missing_doc_returns_not_found`** — missing-doc behaviour unchanged. Stub-loader pattern mirrors `test/unit_test/api/apps/sdk/test_dify_retrieval.py` (added in #15028, passing in CI). ## Provenance — how this fix was produced This PR was authored against a small cited knowledge base committed in the working tree as a `.vouch/` (see [vouchdev/vouch](https://github.com/vouchdev/vouch)). The loop used here: 1. **Grounding first.** Before reading the handler, queried the KB for prior context: `vouch context "tenant scoped accessible authorization"` → retrieved a cited claim distilled from PR #15028 (which restored the same `accessible()` check on `/dify/retrieval`). The retrieved rule: > *ragflow REST endpoints that load by tenant-scoped id must call `<Service>.accessible(id, tenant_id)` after `get_by_id` and before storage/DB read; deny with code 109 'No authorization.' and log a warning. Established by PR #15028.* 2. **Applied the pattern with a domain refinement.** For an API/JSON endpoint, `No authorization.` is the right denial shape. For a **byte-streaming, browser-facing** endpoint like `/preview`, leaking *existence* itself enables enumeration — so per the issue's expected behaviour, this PR denies with `Document not found!` (indistinguishable from missing) instead. Same auth check, narrower response. 3. **Recorded the refinement back into the KB** as a new cited claim, so the next IDOR-class issue starts already grounded in both the general pattern and the byte-route nuance. Net effect of the workflow: the fix replicates a known-good pattern instead of reinventing it, *and* the place where the pattern was nuanced is now retrievable for the next pass. Mechanism is fully independent of this PR — it's not a runtime dependency, just process discipline. Closes #15501
2026-06-03 18:58:26 -07:00
from api.apps import AUTH_JWT, AUTH_API, AUTH_BETA, current_user, login_required
from api.constants import FILE_NAME_LEN_LIMIT, IMG_BASE64_PREFIX
from api.apps.services.document_api_service import validate_document_update_fields, map_doc_keys, \
map_doc_keys_with_run_status, update_document_name_only, update_chunk_method, update_document_status_only, \
reset_document_for_reparse
Fix: UserFillUp interactive forms not working in agent explore mode (#14589) ## Summary - **Backend**: `_iter_session_completion_events` in `agent_api.py` was filtering out `user_inputs` and `workflow_finished` SSE events, causing agents with UserFillUp components to silently fail in explore mode — the interactive form never appeared, while the same agent worked correctly in run (editor) mode. - **Frontend**: `SessionChat` component in explore mode was missing `DebugContent` children rendering inside `MessageItem`, so even if the backend forwarded the events, the form UI would not render. Added `DebugContent`, `MarkdownContent`, `useAwaitCompentData` hook, and input-disabling logic to match the run mode's `chat/box.tsx` behavior. ## What was changed ### Backend (`api/apps/restful_apis/agent_api.py`) - Line 266: Added `"user_inputs"` and `"workflow_finished"` to the allowed event filter in `_iter_session_completion_events` ### Frontend (`web/src/pages/agent/explore/components/session-chat.tsx`) - Added imports: `DebugContent`, `MarkdownContent`, `useAwaitCompentData`, `useParams` - Added `sendFormMessage` from `useSendSessionMessage()` hook - Added `useAwaitCompentData` hook for form state management - Added `DebugContent` as `MessageItem` children for the latest assistant message (renders UserFillUp form) - Added `MarkdownContent` + submitted values display for previous assistant messages - Updated `NextMessageInput` disabled states to respect `isWaitting` (form submission in progress) ## Test plan - [x] Agent with UserFillUp component (e.g., email draft with send/edit/cancel options) shows interactive form in **explore mode** - [x] Same agent continues to work correctly in **run (editor) mode** - [x] Form submission sends data back to the agent and workflow continues - [x] Input field is disabled while waiting for form submission - [ ] Agents without UserFillUp components are unaffected in explore mode 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Zhichang Yu <yuzhichang@gmail.com>
2026-06-28 21:57:57 +08:00
from api.db import VALID_FILE_TYPES, FileType
from api.db.db_models import API4Conversation, DB
from api.db.services import duplicate_name
from api.db.services.doc_metadata_service import DocMetadataService
feat(api): add unified index API and dataset management endpoints (#14222) ### What problem does this PR solve? ## Summary Refactor the dataset API layer into a clean service/REST separation pattern, add a unified `/index` API for graph/raptor/mindmap operations, and introduce several new dataset management endpoints with full test coverage. ## Changes ### Service Layer (`dataset_api_service.py`) - Added `trace_index(dataset_id, tenant_id, index_type)` — unified trace function for all index types - Added `run_index`, `delete_index` service functions - Added `get_dataset`, `get_ingestion_summary`, `list_ingestion_logs`, `get_ingestion_log` - Added `run_embedding`, `list_tags`, `aggregate_tags`, `delete_tags`, `rename_tag` - Added `get_flattened_metadata`, `get_auto_metadata`, `update_auto_metadata` ### REST API Layer (`dataset_api.py`) **New unified routes:** | Method | Route | Description | |--------|-------|-------------| | POST | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Run index task | | GET | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Trace index task | | DELETE | `/datasets/<id>/<index_type>` | Delete index | | GET | `/datasets/<id>` | Get dataset details | | GET | `/datasets/<id>/ingestions/summary` | Ingestion summary | | GET | `/datasets/<id>/ingestions` | List ingestion logs | | GET | `/datasets/<id>/ingestions/<log_id>` | Get single ingestion log | | POST | `/datasets/<id>/embedding` | Run embedding | | GET | `/datasets/<id>/tags` | List tags | | GET | `/datasets/tags/aggregation` | Aggregate tags across datasets | | DELETE | `/datasets/<id>/tags` | Delete tags | | PUT | `/datasets/<id>/tags` | Rename tag | | GET | `/datasets/metadata/flattened` | Get flattened metadata | | GET/PUT | `/datasets/<id>/metadata/config` | New metadata config path | **Removed routes (replaced by unified `/index`):** - `POST /datasets/<id>/mindmap` - `GET /datasets/<id>/mindmap` **Preserved legacy routes (backward compatibility):** - `/run_graphrag`, `/trace_graphrag`, `/run_raptor`, `/trace_raptor` - `/auto_metadata` GET/PUT ### Test Suite - Updated `common.py` helpers: added `trace_index`, removed `run_mindmap`/`trace_mindmap` - Added 7 new test files with 39 test cases total: | Test File | Cases | |-----------|-------| | `test_get_dataset.py` | 4 | | `test_ingestion_summary.py` | 2 | | `test_ingestion_logs.py` | 5 | | `test_index_api.py` | 14 | | `test_embedding.py` | 2 | | `test_tags.py` | 8 | | `test_flattened_metadata.py` | 4 | - Deleted `test_mindmap_tasks.py` (covered by unified index tests) ## Design Decisions 1. **Unified `/index?type=...`** — single endpoint replaces 3 separate route pairs for graph/raptor/mindmap 2. **Backward compatibility** — old routes (`/run_graphrag`, `/run_raptor`, `/auto_metadata`) preserved alongside new paths 3. **`_VALID_INDEX_TYPES = {"graph", "raptor", "mindmap"}`** — input validation via constant set 4. **`_INDEX_TYPE_TO_TASK_ID_FIELD`** — maps index type to KB model task ID field for clean dispatch ## Files Changed - `api/apps/restful_apis/dataset_api.py` - `api/apps/services/dataset_api_service.py` - `sdk/python/ragflow_sdk/modules/dataset.py` - `test/testcases/test_http_api/common.py` - `test/testcases/test_http_api/test_dataset_management/` (7 new files) ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Refactoring --------- Signed-off-by: noob <yixiao121314@outlook.com>
2026-04-27 01:38:01 +00:00
from api.db.db_models import Task
from api.db.services.document_service import DocumentService
from api.db.services.file2document_service import File2DocumentService
from api.db.services.file_service import FileService
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.services.canvas_service import UserCanvasService
from api.common.check_team_permission import check_kb_team_permission
from api.db.services.task_service import TaskService, cancel_all_task_of
from api.utils.api_utils import construct_json_result, get_data_error_result, get_error_data_result, get_result, get_json_result, \
server_error_response, add_tenant_id_to_kwargs, get_request_json, get_error_argument_result, check_duplicate_ids
from api.utils.pagination_utils import validate_rest_api_page_size
from api.utils.validation_utils import (
UpdateDocumentReq, format_validation_error_message, validate_and_parse_json_request, DeleteDocumentReq,
)
feat(api): add unified index API and dataset management endpoints (#14222) ### What problem does this PR solve? ## Summary Refactor the dataset API layer into a clean service/REST separation pattern, add a unified `/index` API for graph/raptor/mindmap operations, and introduce several new dataset management endpoints with full test coverage. ## Changes ### Service Layer (`dataset_api_service.py`) - Added `trace_index(dataset_id, tenant_id, index_type)` — unified trace function for all index types - Added `run_index`, `delete_index` service functions - Added `get_dataset`, `get_ingestion_summary`, `list_ingestion_logs`, `get_ingestion_log` - Added `run_embedding`, `list_tags`, `aggregate_tags`, `delete_tags`, `rename_tag` - Added `get_flattened_metadata`, `get_auto_metadata`, `update_auto_metadata` ### REST API Layer (`dataset_api.py`) **New unified routes:** | Method | Route | Description | |--------|-------|-------------| | POST | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Run index task | | GET | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Trace index task | | DELETE | `/datasets/<id>/<index_type>` | Delete index | | GET | `/datasets/<id>` | Get dataset details | | GET | `/datasets/<id>/ingestions/summary` | Ingestion summary | | GET | `/datasets/<id>/ingestions` | List ingestion logs | | GET | `/datasets/<id>/ingestions/<log_id>` | Get single ingestion log | | POST | `/datasets/<id>/embedding` | Run embedding | | GET | `/datasets/<id>/tags` | List tags | | GET | `/datasets/tags/aggregation` | Aggregate tags across datasets | | DELETE | `/datasets/<id>/tags` | Delete tags | | PUT | `/datasets/<id>/tags` | Rename tag | | GET | `/datasets/metadata/flattened` | Get flattened metadata | | GET/PUT | `/datasets/<id>/metadata/config` | New metadata config path | **Removed routes (replaced by unified `/index`):** - `POST /datasets/<id>/mindmap` - `GET /datasets/<id>/mindmap` **Preserved legacy routes (backward compatibility):** - `/run_graphrag`, `/trace_graphrag`, `/run_raptor`, `/trace_raptor` - `/auto_metadata` GET/PUT ### Test Suite - Updated `common.py` helpers: added `trace_index`, removed `run_mindmap`/`trace_mindmap` - Added 7 new test files with 39 test cases total: | Test File | Cases | |-----------|-------| | `test_get_dataset.py` | 4 | | `test_ingestion_summary.py` | 2 | | `test_ingestion_logs.py` | 5 | | `test_index_api.py` | 14 | | `test_embedding.py` | 2 | | `test_tags.py` | 8 | | `test_flattened_metadata.py` | 4 | - Deleted `test_mindmap_tasks.py` (covered by unified index tests) ## Design Decisions 1. **Unified `/index?type=...`** — single endpoint replaces 3 separate route pairs for graph/raptor/mindmap 2. **Backward compatibility** — old routes (`/run_graphrag`, `/run_raptor`, `/auto_metadata`) preserved alongside new paths 3. **`_VALID_INDEX_TYPES = {"graph", "raptor", "mindmap"}`** — input validation via constant set 4. **`_INDEX_TYPE_TO_TASK_ID_FIELD`** — maps index type to KB model task ID field for clean dispatch ## Files Changed - `api/apps/restful_apis/dataset_api.py` - `api/apps/services/dataset_api_service.py` - `sdk/python/ragflow_sdk/modules/dataset.py` - `test/testcases/test_http_api/common.py` - `test/testcases/test_http_api/test_dataset_management/` (7 new files) ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Refactoring --------- Signed-off-by: noob <yixiao121314@outlook.com>
2026-04-27 01:38:01 +00:00
from common import settings
from common.constants import ParserType, RetCode, TaskStatus, SANDBOX_ARTIFACT_BUCKET
from common.metadata_utils import convert_conditions, meta_filter, turn2jsonschema
from common.misc_utils import get_uuid, thread_pool_exec
from api.utils.file_utils import filename_type, thumbnail
from api.utils.web_utils import CONTENT_TYPE_MAP, html2pdf, is_valid_url, apply_safe_file_response_headers
from common.ssrf_guard import assert_url_is_safe
feat(api): add unified index API and dataset management endpoints (#14222) ### What problem does this PR solve? ## Summary Refactor the dataset API layer into a clean service/REST separation pattern, add a unified `/index` API for graph/raptor/mindmap operations, and introduce several new dataset management endpoints with full test coverage. ## Changes ### Service Layer (`dataset_api_service.py`) - Added `trace_index(dataset_id, tenant_id, index_type)` — unified trace function for all index types - Added `run_index`, `delete_index` service functions - Added `get_dataset`, `get_ingestion_summary`, `list_ingestion_logs`, `get_ingestion_log` - Added `run_embedding`, `list_tags`, `aggregate_tags`, `delete_tags`, `rename_tag` - Added `get_flattened_metadata`, `get_auto_metadata`, `update_auto_metadata` ### REST API Layer (`dataset_api.py`) **New unified routes:** | Method | Route | Description | |--------|-------|-------------| | POST | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Run index task | | GET | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Trace index task | | DELETE | `/datasets/<id>/<index_type>` | Delete index | | GET | `/datasets/<id>` | Get dataset details | | GET | `/datasets/<id>/ingestions/summary` | Ingestion summary | | GET | `/datasets/<id>/ingestions` | List ingestion logs | | GET | `/datasets/<id>/ingestions/<log_id>` | Get single ingestion log | | POST | `/datasets/<id>/embedding` | Run embedding | | GET | `/datasets/<id>/tags` | List tags | | GET | `/datasets/tags/aggregation` | Aggregate tags across datasets | | DELETE | `/datasets/<id>/tags` | Delete tags | | PUT | `/datasets/<id>/tags` | Rename tag | | GET | `/datasets/metadata/flattened` | Get flattened metadata | | GET/PUT | `/datasets/<id>/metadata/config` | New metadata config path | **Removed routes (replaced by unified `/index`):** - `POST /datasets/<id>/mindmap` - `GET /datasets/<id>/mindmap` **Preserved legacy routes (backward compatibility):** - `/run_graphrag`, `/trace_graphrag`, `/run_raptor`, `/trace_raptor` - `/auto_metadata` GET/PUT ### Test Suite - Updated `common.py` helpers: added `trace_index`, removed `run_mindmap`/`trace_mindmap` - Added 7 new test files with 39 test cases total: | Test File | Cases | |-----------|-------| | `test_get_dataset.py` | 4 | | `test_ingestion_summary.py` | 2 | | `test_ingestion_logs.py` | 5 | | `test_index_api.py` | 14 | | `test_embedding.py` | 2 | | `test_tags.py` | 8 | | `test_flattened_metadata.py` | 4 | - Deleted `test_mindmap_tasks.py` (covered by unified index tests) ## Design Decisions 1. **Unified `/index?type=...`** — single endpoint replaces 3 separate route pairs for graph/raptor/mindmap 2. **Backward compatibility** — old routes (`/run_graphrag`, `/run_raptor`, `/auto_metadata`) preserved alongside new paths 3. **`_VALID_INDEX_TYPES = {"graph", "raptor", "mindmap"}`** — input validation via constant set 4. **`_INDEX_TYPE_TO_TASK_ID_FIELD`** — maps index type to KB model task ID field for clean dispatch ## Files Changed - `api/apps/restful_apis/dataset_api.py` - `api/apps/services/dataset_api_service.py` - `sdk/python/ragflow_sdk/modules/dataset.py` - `test/testcases/test_http_api/common.py` - `test/testcases/test_http_api/test_dataset_management/` (7 new files) ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Refactoring --------- Signed-off-by: noob <yixiao121314@outlook.com>
2026-04-27 01:38:01 +00:00
from rag.nlp import search
@manager.route("/documents/upload", methods=["POST"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def upload_info(tenant_id: str):
"""
Upload a document and get its parsed info.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
- in: formData
name: file
type: file
required: false
description: File to upload.
- in: query
name: url
type: string
required: false
description: URL to fetch file from.
responses:
200:
description: Successful operation.
"""
files = await request.files
file_objs = files.getlist("file") if files and files.get("file") else []
url = request.args.get("url")
if file_objs and url:
return get_error_argument_result("Provide either multipart file(s) or ?url=..., not both.")
if not file_objs and not url:
return get_error_argument_result("Missing input: provide multipart file(s) or url")
try:
if url and not file_objs:
try:
assert_url_is_safe(url)
except ValueError as ve:
logging.warning("upload_info: rejected unsafe url: %s", ve)
return get_error_argument_result(str(ve))
data = await thread_pool_exec(FileService.upload_info, tenant_id, None, url)
return get_result(data=data)
if len(file_objs) == 1:
data = await thread_pool_exec(FileService.upload_info, tenant_id, file_objs[0], None)
return get_result(data=data)
results = [await thread_pool_exec(FileService.upload_info, tenant_id, f, None) for f in file_objs]
return get_result(data=results)
except Exception as e:
logging.exception("upload_info failed")
return server_error_response(e)
@manager.route("/datasets/<dataset_id>/documents/<document_id>", methods=["PATCH"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def update_document(tenant_id, dataset_id, document_id):
"""
Update a document within a dataset.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: path
name: document_id
type: string
required: true
description: ID of the document to update.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
- in: body
name: body
description: Document update parameters.
required: true
schema:
type: object
properties:
name:
type: string
description: New name of the document.
parser_config:
type: object
description: Parser configuration.
chunk_method:
type: string
description: Chunking method.
enabled:
type: boolean
description: Document status.
responses:
200:
description: Document updated successfully.
schema:
type: object
"""
req = await get_request_json()
# Verify ownership and existence of dataset and document
if not KnowledgebaseService.query(id=dataset_id, tenant_id=tenant_id):
return get_error_data_result(message="You don't own the dataset.")
e, kb = KnowledgebaseService.get_by_id(dataset_id)
if not e:
return get_error_data_result(message="Can't find this dataset!")
# Prepare data for validation
docs = DocumentService.query(kb_id=dataset_id, id=document_id)
if not docs:
return get_error_data_result(message="The dataset doesn't own the document.")
# Validate document update request parameters
try:
update_doc_req = UpdateDocumentReq(**req)
except ValidationError as e:
return get_error_data_result(message=format_validation_error_message(e), code=RetCode.DATA_ERROR)
doc = docs[0]
# further check with inner status (from DB)
error_msg, error_code = validate_document_update_fields(update_doc_req, doc, req)
if error_msg:
return get_error_data_result(message=error_msg, code=error_code)
# All validations passed, now perform all updates
# meta_fields provided, then update it
if "meta_fields" in req:
if not DocMetadataService.update_document_metadata(document_id, update_doc_req.meta_fields):
return get_error_data_result(message="Failed to update metadata")
# doc name provided from request and diff with existing value, update
if "name" in req and req["name"] != doc.name:
if error := update_document_name_only(document_id, req["name"]):
return error
# "parser_id" provided but does not match with existing doc's file type
if "parser_id" in req and ((doc.type == FileType.VISUAL and req["parser_id"] != "picture")
or (re.search(r"\.(ppt|pptx|pages)$", doc.name) and req["parser_id"] != "presentation")):
return get_data_error_result(message="Not supported yet!")
# parser config provided (already validated in UpdateDocumentReq), update it
if update_doc_req.parser_config:
req["parser_config"].update(update_doc_req.parser_config.ext)
DocumentService.update_parser_config(doc.id, req["parser_config"])
# pipeline_id provided - reset document for reparse
if update_doc_req.pipeline_id:
if error := reset_document_for_reparse(doc, tenant_id, pipeline_id=update_doc_req.pipeline_id):
return error
# chunk method provided - the update method will check if it's different with existing one
elif update_doc_req.chunk_method:
if error := update_chunk_method(req, doc, tenant_id):
return error
if "enabled" in req: # already checked in UpdateDocumentReq - it's int if present
# "enabled" flag provided, the update method will check if it's changed and then update if so
if error := update_document_status_only(int(req["enabled"]), doc, kb):
return error
try:
original_doc_id = doc.id
ok, doc = DocumentService.get_by_id(doc.id)
if not ok:
return get_error_data_result(message=f"Can not get document by id:{original_doc_id}")
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")
renamed_doc = map_doc_keys(doc)
return get_result(data=renamed_doc)
@manager.route("/datasets/<dataset_id>/metadata/summary", methods=["GET"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def metadata_summary(dataset_id, tenant_id):
"""
Get metadata summary for a dataset.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: query
name: doc_ids
type: string
required: false
description: Comma-separated document IDs to filter metadata.
responses:
200:
description: Metadata summary retrieved successfully.
"""
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}. ")
# Get doc_ids from query parameters (comma-separated string)
doc_ids_param = request.args.get("doc_ids", "")
doc_ids = doc_ids_param.split(",") if doc_ids_param else None
try:
summary = DocMetadataService.get_metadata_summary(dataset_id, doc_ids)
return get_result(data={"summary": summary})
except Exception as e:
return server_error_response(e)
feat(api): add unified index API and dataset management endpoints (#14222) ### What problem does this PR solve? ## Summary Refactor the dataset API layer into a clean service/REST separation pattern, add a unified `/index` API for graph/raptor/mindmap operations, and introduce several new dataset management endpoints with full test coverage. ## Changes ### Service Layer (`dataset_api_service.py`) - Added `trace_index(dataset_id, tenant_id, index_type)` — unified trace function for all index types - Added `run_index`, `delete_index` service functions - Added `get_dataset`, `get_ingestion_summary`, `list_ingestion_logs`, `get_ingestion_log` - Added `run_embedding`, `list_tags`, `aggregate_tags`, `delete_tags`, `rename_tag` - Added `get_flattened_metadata`, `get_auto_metadata`, `update_auto_metadata` ### REST API Layer (`dataset_api.py`) **New unified routes:** | Method | Route | Description | |--------|-------|-------------| | POST | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Run index task | | GET | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Trace index task | | DELETE | `/datasets/<id>/<index_type>` | Delete index | | GET | `/datasets/<id>` | Get dataset details | | GET | `/datasets/<id>/ingestions/summary` | Ingestion summary | | GET | `/datasets/<id>/ingestions` | List ingestion logs | | GET | `/datasets/<id>/ingestions/<log_id>` | Get single ingestion log | | POST | `/datasets/<id>/embedding` | Run embedding | | GET | `/datasets/<id>/tags` | List tags | | GET | `/datasets/tags/aggregation` | Aggregate tags across datasets | | DELETE | `/datasets/<id>/tags` | Delete tags | | PUT | `/datasets/<id>/tags` | Rename tag | | GET | `/datasets/metadata/flattened` | Get flattened metadata | | GET/PUT | `/datasets/<id>/metadata/config` | New metadata config path | **Removed routes (replaced by unified `/index`):** - `POST /datasets/<id>/mindmap` - `GET /datasets/<id>/mindmap` **Preserved legacy routes (backward compatibility):** - `/run_graphrag`, `/trace_graphrag`, `/run_raptor`, `/trace_raptor` - `/auto_metadata` GET/PUT ### Test Suite - Updated `common.py` helpers: added `trace_index`, removed `run_mindmap`/`trace_mindmap` - Added 7 new test files with 39 test cases total: | Test File | Cases | |-----------|-------| | `test_get_dataset.py` | 4 | | `test_ingestion_summary.py` | 2 | | `test_ingestion_logs.py` | 5 | | `test_index_api.py` | 14 | | `test_embedding.py` | 2 | | `test_tags.py` | 8 | | `test_flattened_metadata.py` | 4 | - Deleted `test_mindmap_tasks.py` (covered by unified index tests) ## Design Decisions 1. **Unified `/index?type=...`** — single endpoint replaces 3 separate route pairs for graph/raptor/mindmap 2. **Backward compatibility** — old routes (`/run_graphrag`, `/run_raptor`, `/auto_metadata`) preserved alongside new paths 3. **`_VALID_INDEX_TYPES = {"graph", "raptor", "mindmap"}`** — input validation via constant set 4. **`_INDEX_TYPE_TO_TASK_ID_FIELD`** — maps index type to KB model task ID field for clean dispatch ## Files Changed - `api/apps/restful_apis/dataset_api.py` - `api/apps/services/dataset_api_service.py` - `sdk/python/ragflow_sdk/modules/dataset.py` - `test/testcases/test_http_api/common.py` - `test/testcases/test_http_api/test_dataset_management/` (7 new files) ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Refactoring --------- Signed-off-by: noob <yixiao121314@outlook.com>
2026-04-27 01:38:01 +00:00
@manager.route("/datasets/<dataset_id>/metadata/update", methods=["POST"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def metadata_batch_update(dataset_id, tenant_id):
"""
Batch update metadata for documents in a dataset.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
selector:
type: object
updates:
type: array
deletes:
type: array
responses:
200:
description: Metadata updated successfully.
"""
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}. ")
req = await get_request_json()
selector = req.get("selector", {}) or {}
updates = req.get("updates", []) or []
deletes = req.get("deletes", []) or []
if not isinstance(selector, dict):
return get_error_data_result(message="selector must be an object.")
if not isinstance(updates, list) or not isinstance(deletes, list):
return get_error_data_result(message="updates and deletes must be lists.")
metadata_condition = selector.get("metadata_condition", {}) or {}
if metadata_condition and not isinstance(metadata_condition, dict):
return get_error_data_result(message="metadata_condition must be an object.")
document_ids = selector.get("document_ids", []) or []
if document_ids and not isinstance(document_ids, list):
return get_error_data_result(message="document_ids must be a list.")
for upd in updates:
if not isinstance(upd, dict) or not upd.get("key") or "value" not in upd:
return get_error_data_result(message="Each update requires key and value.")
for d in deletes:
if not isinstance(d, dict) or not d.get("key"):
return get_error_data_result(message="Each delete requires key.")
target_doc_ids = set()
if document_ids:
kb_doc_ids = KnowledgebaseService.list_documents_by_ids([dataset_id])
invalid_ids = set(document_ids) - set(kb_doc_ids)
if invalid_ids:
return get_error_data_result(message=f"These documents do not belong to dataset {dataset_id}: {', '.join(invalid_ids)}")
target_doc_ids = set(document_ids)
if metadata_condition:
metas = DocMetadataService.get_flatted_meta_by_kbs([dataset_id])
filtered_ids = set(meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and")))
target_doc_ids = target_doc_ids & filtered_ids
if metadata_condition.get("conditions") and not target_doc_ids:
return get_result(data={"updated": 0, "matched_docs": 0})
target_doc_ids = list(target_doc_ids)
updated = DocMetadataService.batch_update_metadata(dataset_id, target_doc_ids, updates, deletes)
return get_result(data={"updated": updated, "matched_docs": len(target_doc_ids)})
@manager.route("/datasets/<dataset_id>/documents", methods=["POST"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def upload_document(dataset_id, tenant_id):
"""
Upload documents to a dataset.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
- in: formData
name: file
type: file
required: true
description: Document files to upload.
- in: formData
name: parent_path
type: string
description: Optional nested path under the parent folder. Uses '/' separators.
- in: query
name: return_raw_files
type: boolean
required: false
default: false
description: Whether to skip document key mapping and return raw document data
responses:
200:
description: Successfully uploaded documents.
schema:
type: object
properties:
data:
type: array
items:
type: object
properties:
id:
type: string
description: Document ID.
name:
type: string
description: Document name.
chunk_count:
type: integer
description: Number of chunks.
token_count:
type: integer
description: Number of tokens.
dataset_id:
type: string
description: ID of the dataset.
chunk_method:
type: string
description: Chunking method used.
run:
type: string
description: Processing status.
"""
upload_type = (request.args.get("type") or "local").lower()
e, kb = KnowledgebaseService.get_by_id(dataset_id)
if not e:
logging.error(f"Can't find the dataset with ID {dataset_id}!")
return get_error_data_result(message=f"Can't find the dataset with ID {dataset_id}!", code=RetCode.DATA_ERROR)
if not check_kb_team_permission(kb, tenant_id):
logging.error("No authorization.")
return get_error_data_result(message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
if upload_type == "web":
return await _upload_web_document(dataset_id, kb, tenant_id)
if upload_type == "empty":
return await _upload_empty_document(dataset_id, kb, tenant_id)
if upload_type != "local":
return get_error_data_result(
message='`type` must be one of "local", "web", or "empty".',
code=RetCode.ARGUMENT_ERROR,
)
return await _upload_local_documents(kb, tenant_id)
async def _upload_web_document(dataset_id, kb, tenant_id):
form = await request.form
name = (form.get("name") or "").strip()
url = form.get("url")
if not name:
return get_error_data_result(message='Lack of "name"', code=RetCode.ARGUMENT_ERROR)
if not url:
return get_error_data_result(message='Lack of "url"', code=RetCode.ARGUMENT_ERROR)
if len(name.encode("utf-8")) > FILE_NAME_LEN_LIMIT:
return get_error_data_result(
message=f"File name must be {FILE_NAME_LEN_LIMIT} bytes or less.",
code=RetCode.ARGUMENT_ERROR,
)
if not is_valid_url(url):
return get_error_data_result(message="The URL format is invalid", code=RetCode.ARGUMENT_ERROR)
blob = html2pdf(url)
if not blob:
return server_error_response(ValueError("Download failure."))
root_folder = FileService.get_root_folder(tenant_id)
FileService.init_knowledgebase_docs(root_folder["id"], tenant_id)
kb_root_folder = FileService.get_kb_folder(tenant_id)
kb_folder = FileService.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"])
try:
filename = duplicate_name(DocumentService.query, name=f"{name}.pdf", kb_id=kb.id)
filetype = filename_type(filename)
if filetype == FileType.OTHER.value:
raise RuntimeError("This type of file has not been supported yet!")
location = filename
while settings.STORAGE_IMPL.obj_exist(dataset_id, location):
location += "_"
settings.STORAGE_IMPL.put(dataset_id, location, blob)
doc = {
"id": get_uuid(),
"kb_id": kb.id,
"parser_id": kb.parser_id,
"pipeline_id": kb.pipeline_id,
"parser_config": kb.parser_config,
"created_by": tenant_id,
"type": filetype,
"name": filename,
"location": location,
"size": len(blob),
"thumbnail": thumbnail(filename, blob),
"suffix": Path(filename).suffix.lstrip("."),
}
if doc["type"] == FileType.VISUAL:
doc["parser_id"] = ParserType.PICTURE.value
if doc["type"] == FileType.AURAL:
doc["parser_id"] = ParserType.AUDIO.value
if re.search(r"\.(ppt|pptx|pages)$", filename):
doc["parser_id"] = ParserType.PRESENTATION.value
if re.search(r"\.(eml)$", filename):
doc["parser_id"] = ParserType.EMAIL.value
DocumentService.insert(doc)
FileService.add_file_from_kb(doc, kb_folder["id"], kb.tenant_id)
return get_result(data=map_doc_keys_with_run_status(doc, run_status="0"))
except Exception as e:
return server_error_response(e)
async def _upload_empty_document(dataset_id, kb, tenant_id):
req = await get_request_json()
name = (req.get("name") or "").strip()
if not name:
return get_error_data_result(message="File name can't be empty.", code=RetCode.ARGUMENT_ERROR)
if len(name.encode("utf-8")) > FILE_NAME_LEN_LIMIT:
return get_error_data_result(
message=f"File name must be {FILE_NAME_LEN_LIMIT} bytes or less.",
code=RetCode.ARGUMENT_ERROR,
)
if DocumentService.query(name=name, kb_id=dataset_id):
return get_error_data_result(message="Duplicated document name in the same dataset.")
try:
kb_root_folder = FileService.get_kb_folder(kb.tenant_id)
if not kb_root_folder:
return get_error_data_result(message="Cannot find the root folder.")
kb_folder = FileService.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"])
if not kb_folder:
return get_error_data_result(message="Cannot find the kb folder for this file.")
doc = DocumentService.insert(
{
"id": get_uuid(),
"kb_id": kb.id,
"parser_id": kb.parser_id,
"pipeline_id": kb.pipeline_id,
"parser_config": kb.parser_config,
"created_by": tenant_id,
"type": FileType.VIRTUAL,
"name": name,
"suffix": Path(name).suffix.lstrip("."),
"location": "",
"size": 0,
}
)
FileService.add_file_from_kb(doc.to_dict(), kb_folder["id"], kb.tenant_id)
return get_result(data=map_doc_keys(doc))
except Exception as e:
return server_error_response(e)
async def _upload_local_documents(kb, tenant_id):
form = await request.form
files = await request.files
if "file" not in files:
logging.error("No file part!")
return get_error_data_result(message="No file part!", code=RetCode.ARGUMENT_ERROR)
file_objs = files.getlist("file")
for file_obj in file_objs:
if file_obj is None or file_obj.filename is None or file_obj.filename == "":
logging.error("No file selected!")
return get_error_data_result(message="No file selected!", code=RetCode.ARGUMENT_ERROR)
if len(file_obj.filename.encode("utf-8")) > FILE_NAME_LEN_LIMIT:
msg = f"File name must be {FILE_NAME_LEN_LIMIT} bytes or less."
logging.error(msg)
return get_error_data_result(message=msg, code=RetCode.ARGUMENT_ERROR)
# Parse optional parser_config overrides from form data
parser_config_override = None
raw_parser_config = form.get("parser_config")
if raw_parser_config:
try:
parsed = json.loads(raw_parser_config)
if isinstance(parsed, dict):
# Only allow known table column config keys to prevent arbitrary overrides
allowed_keys = {"table_column_mode", "table_column_roles"}
parser_config_override = {k: v for k, v in parsed.items() if k in allowed_keys}
if not parser_config_override:
parser_config_override = None
except (json.JSONDecodeError, TypeError):
parser_config_override = None
err, files = await thread_pool_exec(
FileService.upload_document, kb, file_objs, tenant_id,
parent_path=form.get("parent_path"),
parser_config_override=parser_config_override,
)
# Handle partial success: some files uploaded successfully, some had errors
is_partial_success = err and files
if err and not is_partial_success:
msg = "\n".join(err)
logging.error(msg)
return get_error_data_result(message=msg, code=RetCode.SERVER_ERROR)
if not files:
msg = "There seems to be an issue with your file format. please verify it is correct and not corrupted."
logging.error(msg)
return get_error_data_result(message=msg, code=RetCode.DATA_ERROR)
files = [f[0] for f in files] # remove the blob
return_raw_files = request.args.get("return_raw_files", "false").lower() == "true"
if return_raw_files:
doc_data = files
else:
doc_data = [map_doc_keys_with_run_status(doc, run_status="0") for doc in files]
# For partial success, include error message along with successful uploads
if is_partial_success:
msg = "\n".join(err)
logging.warning(f"Partial upload success: {len(files)} succeeded, {len(err)} failed - {msg}")
return construct_json_result(code=RetCode.SERVER_ERROR, message=msg, data=doc_data)
return get_result(data=doc_data)
@manager.route("/datasets/<dataset_id>/documents", methods=["GET"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
def list_docs(dataset_id, tenant_id):
"""
List documents in a dataset.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: query
name: page
type: integer
required: false
default: 1
description: Page number.
- in: query
name: page_size
type: integer
required: false
default: 30
description: Number of items per page.
- in: query
name: orderby
type: string
required: false
default: "create_time"
description: Field to order by.
- in: query
name: desc
type: boolean
required: false
default: true
description: Order in descending.
- in: query
name: create_time_from
type: integer
required: false
default: 0
description: Unix timestamp for filtering documents created after this time. 0 means no filter.
- in: query
name: create_time_to
type: integer
required: false
default: 0
description: Unix timestamp for filtering documents created before this time. 0 means no filter.
- in: query
name: suffix
type: array
items:
type: string
required: false
description: Filter by file suffix (e.g., ["pdf", "txt", "docx"]).
- in: query
name: run
type: array
items:
type: string
required: false
description: Filter by document run status. Supports both numeric ("0", "1", "2", "3", "4") and text formats ("UNSTART", "RUNNING", "CANCEL", "DONE", "FAIL").
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
responses:
200:
description: List of documents.
schema:
type: object
properties:
total:
type: integer
description: Total number of documents.
docs:
type: array
items:
type: object
properties:
id:
type: string
description: Document ID.
name:
type: string
description: Document name.
chunk_count:
type: integer
description: Number of chunks.
token_count:
type: integer
description: Number of tokens.
dataset_id:
type: string
description: ID of the dataset.
chunk_method:
type: string
description: Chunking method used.
run:
type: string
description: Processing status.
"""
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
logging.error(f"You don't own the dataset {dataset_id}. ")
return get_error_data_result(message=f"You don't own the dataset {dataset_id}. ")
if request.args.get("type") == "filter":
err_code, err_msg, payload, total = _get_doc_filters_with_request(request, dataset_id)
if err_code != RetCode.SUCCESS:
return get_data_error_result(code=err_code, message=err_msg)
return get_json_result(data={"total": total, "filter": payload})
err_code, err_msg, payload, total = _get_docs_with_request(request, dataset_id)
if err_code != RetCode.SUCCESS:
return get_data_error_result(code=err_code, message=err_msg)
renamed_doc_list = [map_doc_keys(doc) for doc in payload]
for doc_item in renamed_doc_list:
if doc_item["thumbnail"] and not doc_item["thumbnail"].startswith(IMG_BASE64_PREFIX):
doc_item["thumbnail"] = f"/api/v1/documents/images/{dataset_id}-{doc_item['thumbnail']}"
if doc_item.get("source_type"):
doc_item["source_type"] = doc_item["source_type"].split("/")[0]
if doc_item["parser_config"].get("metadata"):
doc_item["parser_config"]["metadata"] = turn2jsonschema(doc_item["parser_config"]["metadata"])
return get_json_result(data={"total": total, "docs": renamed_doc_list})
def _get_docs_with_request(req, dataset_id:str):
"""Get documents with request parameters from a dataset.
This function extracts filtering parameters from the request and returns
a list of documents matching the specified criteria.
Args:
req: The request object containing query parameters.
- page (int): Page number for pagination (default: 1).
- page_size (int): Number of documents per page (default: 30).
- orderby (str): Field to order by (default: "create_time").
- desc (bool): Whether to order in descending order (default: True).
- keywords (str): Keywords to search in document names.
- suffix (list): File suffix filters.
- types (list): Document type filters.
- run (list): Processing status filters.
- create_time_from (int): Start timestamp for time range filter.
- create_time_to (int): End timestamp for time range filter.
- return_empty_metadata (bool|str): Whether to return documents with empty metadata.
- metadata_condition (str): JSON string for complex metadata conditions.
- metadata (str): JSON string for simple metadata key-value matching.
dataset_id: The dataset ID to retrieve documents from.
Returns:
A tuple of (err_code, err_message, docs, total):
- err_code (int): Success code (RetCode.SUCCESS) if successful, or error code if validation fails.
- err_message (str): Empty string if successful, or error message if validation fails.
- docs (list): List of document dictionaries matching the criteria, or empty list on error.
- total (int): Total number of documents matching the criteria.
Note:
- The function supports filtering by document types, processing status, keywords, and time range.
- Metadata filtering supports both simple key-value matching and complex conditions with operators.
"""
q = req.args
page = int(q.get("page", 1))
page_size = validate_rest_api_page_size(int(q.get("page_size", 30)))
orderby = q.get("orderby", "create_time")
desc = str(q.get("desc", "true")).strip().lower() != "false"
keywords = q.get("keywords", "")
# filters - align with OpenAPI parameter names
suffix = q.getlist("suffix")
types = q.getlist("types")
if types:
invalid_types = {t for t in types if t not in VALID_FILE_TYPES}
if invalid_types:
msg = f"Invalid filter conditions: {', '.join(invalid_types)} type{'s' if len(invalid_types) > 1 else ''}"
return RetCode.DATA_ERROR, msg, [], 0
run_status_converted, invalid_status = _parse_run_status_filter(q)
if invalid_status:
msg = f"Invalid filter run status conditions: {', '.join(invalid_status)}"
return RetCode.DATA_ERROR, msg, [], 0
err_code, err_message, doc_ids_filter, return_empty_metadata = _parse_doc_id_filter_with_metadata(q, dataset_id)
if err_code != RetCode.SUCCESS:
return err_code, err_message, [], 0
doc_name = q.get("name")
doc_id = q.get("id")
if doc_id:
if not DocumentService.query(id=doc_id, kb_id=dataset_id):
return RetCode.DATA_ERROR, f"You don't own the document {doc_id}.", [], 0
doc_ids_filter = [doc_id] # id provided, ignore other filters
if doc_name and not DocumentService.query(name=doc_name, kb_id=dataset_id):
return RetCode.DATA_ERROR, f"You don't own the document {doc_name}.", [], 0
doc_ids = q.getlist("ids")
if doc_id and len(doc_ids) > 0:
return RetCode.DATA_ERROR, f"Should not provide both 'id':{doc_id} and 'ids'{doc_ids}"
if len(doc_ids) > 0:
doc_ids_filter = doc_ids
docs, total = DocumentService.get_by_kb_id(dataset_id, page, page_size, orderby, desc, keywords, run_status_converted, types, suffix,
name=doc_name, doc_ids=doc_ids_filter, return_empty_metadata=return_empty_metadata)
# time range filter (0 means no bound)
create_time_from = int(q.get("create_time_from", 0))
create_time_to = int(q.get("create_time_to", 0))
if create_time_from or create_time_to:
docs = [d for d in docs if (create_time_from == 0 or d.get("create_time", 0) >= create_time_from) and (create_time_to == 0 or d.get("create_time", 0) <= create_time_to)]
return RetCode.SUCCESS, "", docs, total
def _get_doc_filters_with_request(req, dataset_id: str):
"""Get aggregated document filters with request parameters from a dataset."""
q = req.args
keywords = q.get("keywords", "")
suffix = q.getlist("suffix")
types = q.getlist("types")
if types:
invalid_types = {t for t in types if t not in VALID_FILE_TYPES}
if invalid_types:
msg = f"Invalid filter conditions: {', '.join(invalid_types)} type{'s' if len(invalid_types) > 1 else ''}"
return RetCode.DATA_ERROR, msg, {}, 0
run_status_converted, invalid_status = _parse_run_status_filter(q)
if invalid_status:
msg = f"Invalid filter run status conditions: {', '.join(invalid_status)}"
return RetCode.DATA_ERROR, msg, {}, 0
docs_filter, total = DocumentService.get_filter_by_kb_id(
dataset_id,
keywords,
run_status_converted,
types,
suffix,
)
return RetCode.SUCCESS, "", docs_filter, total
def _get_query_values(req_args, *names):
values = []
for name in names:
values.extend(req_args.getlist(name))
values.extend(req_args.getlist(f"{name}[]"))
return [str(value).strip() for value in values if value is not None and str(value).strip()]
def _parse_run_status_filter(req_args):
raw_statuses = _get_query_values(req_args, "run", "run_status")
status_text_to_numeric = {status.name: status.value for status in TaskStatus}
valid_statuses = set(status_text_to_numeric.values())
converted = [status_text_to_numeric.get(status.upper(), status) for status in raw_statuses]
invalid_statuses = {status for status in converted if status not in valid_statuses}
return converted, invalid_statuses
def _parse_doc_id_filter_with_metadata(req, kb_id):
"""Parse document ID filter based on metadata conditions from the request.
This function extracts and processes metadata filtering parameters from the request
and returns a list of document IDs that match the specified criteria. It supports
two filtering modes: simple metadata key-value matching and complex metadata
conditions with operators.
Args:
req: The request object containing filtering parameters.
- return_empty_metadata (bool|str): If True, returns all documents regardless
of their metadata. Can be a boolean or string "true"/"false".
- metadata_condition (str): JSON string containing complex metadata conditions
with optional "logic" (and/or) and "conditions" list. Each condition should
have "name" (key), "comparison_operator", and "value" fields.
- metadata (str): JSON string containing key-value pairs for exact metadata
matching. Values can be a single value or list of values (OR logic within
same key). Can include special key "empty_metadata" to indicate documents
with empty metadata.
kb_id: The knowledge base ID to filter documents from.
Returns:
A tuple of (err_code, err_message, docs, return_empty_metadata):
- err_code (int): Success code (RetCode.SUCCESS) if successful, or error code if validation fails.
- err_message (str): Empty string if successful, or error message if validation fails.
- docs (list): List of document IDs matching the metadata criteria,
or empty list if no filter should be applied or on error.
- return_empty_metadata (bool): The processed flag indicating whether to
return documents with empty metadata.
Note:
- When both metadata and metadata_condition are provided, they are combined with AND logic.
- The metadata_condition uses operators like: =, !=, >, <, >=, <=, contains, not contains,
in, not in, start with, end with, empty, not empty.
- The metadata parameter performs exact matching where values are OR'd within the same key
& AND'd across different keys.
Examples:
Simple metadata filter (exact match):
req = {"metadata": '{"author": ["John", "Jane"]}'}
# Returns documents where author is John OR Jane
Simple metadata filter with multiple keys:
req = {"metadata": '{"author": "John", "status": "published"}'}
# Returns documents where author is John AND status is published
Complex metadata conditions:
req = {"metadata_condition": '{"logic": "and", "conditions": [{"name": "status", "comparison_operator": "eq", "value": "published"}]}'}
# Returns documents where status equals "published"
Complex conditions with multiple operators:
req = {"metadata_condition": '{"logic": "or", "conditions": [{"name": "priority", "comparison_operator": "=", "value": "high"}, {"name": "status", "comparison_operator": "contains", "value": "urgent"}]}'}
# Returns documents where priority is high OR status contains "urgent"
Return empty metadata:
req = {"return_empty_metadata": True}
# Returns all documents regardless of metadata
Combined metadata and metadata_condition:
req = {"metadata": '{"author": "John"}', "metadata_condition": '{"logic": "and", "conditions": [{"name": "status", "comparison_operator": "=", "value": "published"}]}'}
# Returns documents where author is John AND status equals published
"""
return_empty_metadata = req.get("return_empty_metadata", False)
if isinstance(return_empty_metadata, str):
return_empty_metadata = return_empty_metadata.lower() == "true"
try:
metadata_condition = json.loads(req.get("metadata_condition", "{}"))
except json.JSONDecodeError:
msg = f'metadata_condition must be valid JSON: {req.get("metadata_condition")}.'
return RetCode.DATA_ERROR, msg, [], return_empty_metadata
try:
metadata = json.loads(req.get("metadata", "{}"))
except json.JSONDecodeError:
logging.error(msg=f'metadata must be valid JSON: {req.get("metadata")}.')
return RetCode.DATA_ERROR, "metadata must be valid JSON.", [], return_empty_metadata
if isinstance(metadata, dict) and metadata.get("empty_metadata"):
return_empty_metadata = True
metadata = {k: v for k, v in metadata.items() if k != "empty_metadata"}
if return_empty_metadata:
metadata_condition = {}
metadata = {}
else:
if metadata_condition and not isinstance(metadata_condition, dict):
return RetCode.DATA_ERROR, "metadata_condition must be an object.", [], return_empty_metadata
if metadata and not isinstance(metadata, dict):
return RetCode.DATA_ERROR, "metadata must be an object.", [], return_empty_metadata
metas = dict()
if metadata_condition or metadata:
metas = DocMetadataService.get_flatted_meta_by_kbs([kb_id])
doc_ids_filter = None
if metadata_condition:
doc_ids_filter = set(meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and")))
if metadata_condition.get("conditions") and not doc_ids_filter:
return RetCode.SUCCESS, "", [], return_empty_metadata
if metadata:
metadata_doc_ids = None
for key, values in metadata.items():
if not values:
continue
if not isinstance(values, list):
values = [values]
values = [str(v) for v in values if v is not None and str(v).strip()]
if not values:
continue
key_doc_ids = set()
for value in values:
key_doc_ids.update(metas.get(key, {}).get(value, []))
if metadata_doc_ids is None:
metadata_doc_ids = key_doc_ids
else:
metadata_doc_ids &= key_doc_ids
if not metadata_doc_ids:
return RetCode.SUCCESS, "", [], return_empty_metadata
if metadata_doc_ids is not None:
if doc_ids_filter is None:
doc_ids_filter = metadata_doc_ids
else:
doc_ids_filter &= metadata_doc_ids
if not doc_ids_filter:
return RetCode.SUCCESS, "", [], return_empty_metadata
return RetCode.SUCCESS, "", list(doc_ids_filter) if doc_ids_filter is not None else None, return_empty_metadata
@manager.route("/datasets/<dataset_id>/documents", methods=["DELETE"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def delete_documents(tenant_id, dataset_id):
"""
Delete documents from a dataset.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset containing the documents.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
- in: body
name: body
description: Document deletion parameters.
required: true
schema:
type: object
properties:
ids:
type: array or null
items:
type: string
description: |
Specifies the documents to delete:
- An array of IDs, only the specified documents will be deleted.
delete_all:
type: boolean
default: false
description: Whether to delete all documents in the dataset.
responses:
200:
description: Successful operation.
schema:
type: object
"""
req, err = await validate_and_parse_json_request(request, DeleteDocumentReq)
if err is not None or req is None:
return get_error_argument_result(err)
try:
# Validate dataset exists and user has permission
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}. ")
# Get documents to delete
doc_ids = req.get("ids") or []
delete_all = req.get("delete_all", False)
if not delete_all and len(doc_ids) == 0:
return get_error_data_result(message=f"should either provide doc ids or set delete_all(true), dataset: {dataset_id}. ")
if len(doc_ids) > 0 and delete_all:
return get_error_data_result(message=f"should not provide both doc ids and delete_all(true), dataset: {dataset_id}. ")
if delete_all:
doc_ids = [doc.id for doc in DocumentService.query(kb_id=dataset_id)]
dataset_doc_ids = {doc.id for doc in DocumentService.query(kb_id=dataset_id)}
invalid_ids = [doc_id for doc_id in doc_ids if doc_id not in dataset_doc_ids]
if invalid_ids:
return get_error_data_result(
message=f"These documents do not belong to dataset {dataset_id} or Document not found: {', '.join(invalid_ids)}"
)
# make sure each id is unique
unique_doc_ids, duplicate_messages = check_duplicate_ids(doc_ids, "document")
if duplicate_messages:
logging.warning(f"duplicate_messages:{duplicate_messages}")
else:
doc_ids = unique_doc_ids
# Delete documents using existing FileService.delete_docs
errors = await thread_pool_exec(FileService.delete_docs, doc_ids, tenant_id)
if errors:
return get_error_data_result(message=str(errors))
return get_result(data={"deleted": len(doc_ids)})
except Exception as e:
logging.exception(e)
return get_error_data_result(message="Internal server error")
@manager.route("/datasets/<dataset_id>/documents/<document_id>/metadata/config", methods=["PUT"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def update_metadata_config(tenant_id, dataset_id, document_id):
"""
Update document metadata configuration.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: path
name: document_id
type: string
required: true
description: ID of the document.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
- in: body
name: body
description: Metadata configuration.
required: true
schema:
type: object
properties:
metadata:
type: object
description: Metadata configuration JSON.
responses:
200:
description: Document updated successfully.
"""
# Verify ownership and existence of dataset
if not KnowledgebaseService.query(id=dataset_id, tenant_id=tenant_id):
return get_error_data_result(message="You don't own the dataset.")
# Verify document exists in the dataset
doc = DocumentService.query(id=document_id, kb_id=dataset_id)
if not doc:
msg = f"Document {document_id} not found in dataset {dataset_id}"
return get_error_data_result(message=msg)
doc = doc[0]
# Get request body
req = await get_request_json()
if "metadata" not in req:
return get_error_argument_result(message="metadata is required")
# Update parser config with metadata
try:
DocumentService.update_parser_config(doc.id, {"metadata": req["metadata"]})
except Exception as e:
logging.error("error when update_parser_config", exc_info=e)
return get_json_result(code=RetCode.EXCEPTION_ERROR, message=repr(e))
# Get updated document
try:
e, doc = DocumentService.get_by_id(doc.id)
if not e:
return get_data_error_result(message="Document not found!")
except Exception as e:
return get_json_result(code=RetCode.EXCEPTION_ERROR, message=repr(e))
return get_result(data=doc.to_dict())
@manager.route("/thumbnails", methods=["GET"]) # noqa: F821
@login_required(auth_types=[AUTH_JWT, AUTH_API, AUTH_BETA])
def list_thumbnails():
"""
Get thumbnails for documents.
---
tags:
- Documents
parameters:
- in: query
name: doc_ids
type: array
required: true
description: List of document IDs to get thumbnails for.
responses:
200:
description: Successfully retrieved thumbnails
400:
description: Missing document IDs
"""
from api.constants import IMG_BASE64_PREFIX
from api.db.services.document_service import DocumentService
doc_ids = request.args.getlist("doc_ids")
if not doc_ids:
return get_json_result(data=False, message='Lack of "Document ID"', code=RetCode.ARGUMENT_ERROR)
try:
docs = DocumentService.get_thumbnails(doc_ids)
for doc_item in docs:
if doc_item["thumbnail"] and not doc_item["thumbnail"].startswith(IMG_BASE64_PREFIX):
doc_item["thumbnail"] = f"/api/v1/documents/images/{doc_item['kb_id']}-{doc_item['thumbnail']}"
return get_json_result(data={d["id"]: d["thumbnail"] for d in docs})
except Exception as e:
return server_error_response(e)
@manager.route("/datasets/<dataset_id>/documents/metadatas", methods=["PATCH"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def update_metadata(tenant_id, dataset_id):
"""
Update document metadata in batch.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
- in: body
name: body
description: Metadata update request.
required: true
schema:
type: object
properties:
selector:
type: object
description: Document selector.
properties:
document_ids:
type: array
items:
type: string
description: List of document IDs to update.
metadata_condition:
type: object
description: Filter documents by existing metadata.
updates:
type: array
items:
type: object
properties:
key:
type: string
value:
type: any
description: List of metadata key-value pairs to update.
deletes:
type: array
items:
type: object
properties:
key:
type: string
description: List of metadata keys to delete.
responses:
200:
description: Metadata updated successfully.
"""
# Verify ownership of dataset
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
# Get request body
req = await get_request_json()
selector = req.get("selector", {}) or {}
updates = req.get("updates", []) or []
deletes = req.get("deletes", []) or []
# Validate selector
if not isinstance(selector, dict):
return get_error_data_result(message="selector must be an object.")
if not isinstance(updates, list) or not isinstance(deletes, list):
return get_error_data_result(message="updates and deletes must be lists.")
# Validate metadata_condition
metadata_condition = selector.get("metadata_condition", {}) or {}
if metadata_condition and not isinstance(metadata_condition, dict):
return get_error_data_result(message="metadata_condition must be an object.")
# Validate document_ids
document_ids = selector.get("document_ids", []) or []
if document_ids and not isinstance(document_ids, list):
return get_error_data_result(message="document_ids must be a list.")
# Validate updates
for upd in updates:
if not isinstance(upd, dict) or not upd.get("key") or "value" not in upd:
return get_error_data_result(message="Each update requires key and value.")
# Validate deletes
for d in deletes:
if not isinstance(d, dict) or not d.get("key"):
return get_error_data_result(message="Each delete requires key.")
# Initialize target document IDs
target_doc_ids = set()
# If document_ids provided, validate they belong to the dataset
if document_ids:
kb_doc_ids = KnowledgebaseService.list_documents_by_ids([dataset_id])
invalid_ids = set(document_ids) - set(kb_doc_ids)
if invalid_ids:
return get_error_data_result(
message=f"These documents do not belong to dataset {dataset_id}: {', '.join(invalid_ids)}"
)
target_doc_ids = set(document_ids)
# Apply metadata_condition filtering if provided
if metadata_condition:
metas = DocMetadataService.get_flatted_meta_by_kbs([dataset_id])
filtered_ids = set(
meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and"))
)
target_doc_ids = target_doc_ids & filtered_ids
if metadata_condition.get("conditions") and not target_doc_ids:
return get_result(data={"updated": 0, "matched_docs": 0})
# Convert to list and perform update
target_doc_ids = list(target_doc_ids)
updated = DocMetadataService.batch_update_metadata(dataset_id, target_doc_ids, updates, deletes)
return get_result(data={"updated": updated, "matched_docs": len(target_doc_ids)})
feat(api): add unified index API and dataset management endpoints (#14222) ### What problem does this PR solve? ## Summary Refactor the dataset API layer into a clean service/REST separation pattern, add a unified `/index` API for graph/raptor/mindmap operations, and introduce several new dataset management endpoints with full test coverage. ## Changes ### Service Layer (`dataset_api_service.py`) - Added `trace_index(dataset_id, tenant_id, index_type)` — unified trace function for all index types - Added `run_index`, `delete_index` service functions - Added `get_dataset`, `get_ingestion_summary`, `list_ingestion_logs`, `get_ingestion_log` - Added `run_embedding`, `list_tags`, `aggregate_tags`, `delete_tags`, `rename_tag` - Added `get_flattened_metadata`, `get_auto_metadata`, `update_auto_metadata` ### REST API Layer (`dataset_api.py`) **New unified routes:** | Method | Route | Description | |--------|-------|-------------| | POST | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Run index task | | GET | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Trace index task | | DELETE | `/datasets/<id>/<index_type>` | Delete index | | GET | `/datasets/<id>` | Get dataset details | | GET | `/datasets/<id>/ingestions/summary` | Ingestion summary | | GET | `/datasets/<id>/ingestions` | List ingestion logs | | GET | `/datasets/<id>/ingestions/<log_id>` | Get single ingestion log | | POST | `/datasets/<id>/embedding` | Run embedding | | GET | `/datasets/<id>/tags` | List tags | | GET | `/datasets/tags/aggregation` | Aggregate tags across datasets | | DELETE | `/datasets/<id>/tags` | Delete tags | | PUT | `/datasets/<id>/tags` | Rename tag | | GET | `/datasets/metadata/flattened` | Get flattened metadata | | GET/PUT | `/datasets/<id>/metadata/config` | New metadata config path | **Removed routes (replaced by unified `/index`):** - `POST /datasets/<id>/mindmap` - `GET /datasets/<id>/mindmap` **Preserved legacy routes (backward compatibility):** - `/run_graphrag`, `/trace_graphrag`, `/run_raptor`, `/trace_raptor` - `/auto_metadata` GET/PUT ### Test Suite - Updated `common.py` helpers: added `trace_index`, removed `run_mindmap`/`trace_mindmap` - Added 7 new test files with 39 test cases total: | Test File | Cases | |-----------|-------| | `test_get_dataset.py` | 4 | | `test_ingestion_summary.py` | 2 | | `test_ingestion_logs.py` | 5 | | `test_index_api.py` | 14 | | `test_embedding.py` | 2 | | `test_tags.py` | 8 | | `test_flattened_metadata.py` | 4 | - Deleted `test_mindmap_tasks.py` (covered by unified index tests) ## Design Decisions 1. **Unified `/index?type=...`** — single endpoint replaces 3 separate route pairs for graph/raptor/mindmap 2. **Backward compatibility** — old routes (`/run_graphrag`, `/run_raptor`, `/auto_metadata`) preserved alongside new paths 3. **`_VALID_INDEX_TYPES = {"graph", "raptor", "mindmap"}`** — input validation via constant set 4. **`_INDEX_TYPE_TO_TASK_ID_FIELD`** — maps index type to KB model task ID field for clean dispatch ## Files Changed - `api/apps/restful_apis/dataset_api.py` - `api/apps/services/dataset_api_service.py` - `sdk/python/ragflow_sdk/modules/dataset.py` - `test/testcases/test_http_api/common.py` - `test/testcases/test_http_api/test_dataset_management/` (7 new files) ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Refactoring --------- Signed-off-by: noob <yixiao121314@outlook.com>
2026-04-27 01:38:01 +00:00
@manager.route("/documents/ingest", methods=["POST"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def ingest(tenant_id):
req = await get_request_json()
try:
user_id = tenant_id
error_code, error_message = await thread_pool_exec(_run_sync, user_id, req)
if error_code:
logging.error(f"error when ingest documents:{req}, error message:{error_message}")
return get_json_result(error_code, error_message)
return get_json_result(data=True)
except Exception as e:
logging.exception("document ingest/run failed")
return server_error_response(e)
def _run_sync(user_id:str, req):
for doc_id in req["doc_ids"]:
if not DocumentService.accessible(doc_id, user_id):
return RetCode.AUTHENTICATION_ERROR, "No authorization."
kb_table_num_map = {}
for doc_id in req["doc_ids"]:
info = {"run": str(req["run"]), "progress": 0}
rerun_with_delete = str(req["run"]) == TaskStatus.RUNNING.value and req.get("delete", False)
if rerun_with_delete:
info["progress_msg"] = ""
info["chunk_num"] = 0
info["token_num"] = 0
doc_tenant_id = DocumentService.get_tenant_id(doc_id)
if not doc_tenant_id:
return RetCode.DATA_ERROR, "Tenant not found!"
e, doc = DocumentService.get_by_id(doc_id)
if not e:
return RetCode.DATA_ERROR, "Document not found!"
if str(req["run"]) == TaskStatus.CANCEL.value:
tasks = list(TaskService.query(doc_id=doc_id))
has_unfinished_task = any((task.progress or 0) < 1 for task in tasks)
if str(doc.run) in [TaskStatus.RUNNING.value, TaskStatus.CANCEL.value] or has_unfinished_task:
cancel_all_task_of(doc_id)
else:
return RetCode.DATA_ERROR, "Cannot cancel a task that is not in RUNNING status"
if all([rerun_with_delete, str(doc.run) == TaskStatus.DONE.value]):
DocumentService.clear_chunk_num_when_rerun(doc_id)
DocumentService.update_by_id(doc_id, info)
if req.get("delete", False):
TaskService.filter_delete([Task.doc_id == doc_id])
if settings.docStoreConn.index_exist(search.index_name(doc_tenant_id), doc.kb_id):
settings.docStoreConn.delete({"doc_id": doc_id}, search.index_name(doc_tenant_id), doc.kb_id)
if str(req["run"]) == TaskStatus.RUNNING.value:
if req.get("apply_kb"):
e, kb = KnowledgebaseService.get_by_id(doc.kb_id)
if not e:
raise LookupError("Can't find this dataset!")
doc.parser_config["llm_id"] = kb.parser_config.get("llm_id")
doc.parser_config["enable_metadata"] = kb.parser_config.get("enable_metadata", False)
doc.parser_config["metadata"] = kb.parser_config.get("metadata", {})
DocumentService.update_parser_config(doc.id, doc.parser_config)
doc_dict = doc.to_dict()
DocumentService.run(doc_tenant_id, doc_dict, kb_table_num_map)
return None, None
feat(api): add unified index API and dataset management endpoints (#14222) ### What problem does this PR solve? ## Summary Refactor the dataset API layer into a clean service/REST separation pattern, add a unified `/index` API for graph/raptor/mindmap operations, and introduce several new dataset management endpoints with full test coverage. ## Changes ### Service Layer (`dataset_api_service.py`) - Added `trace_index(dataset_id, tenant_id, index_type)` — unified trace function for all index types - Added `run_index`, `delete_index` service functions - Added `get_dataset`, `get_ingestion_summary`, `list_ingestion_logs`, `get_ingestion_log` - Added `run_embedding`, `list_tags`, `aggregate_tags`, `delete_tags`, `rename_tag` - Added `get_flattened_metadata`, `get_auto_metadata`, `update_auto_metadata` ### REST API Layer (`dataset_api.py`) **New unified routes:** | Method | Route | Description | |--------|-------|-------------| | POST | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Run index task | | GET | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Trace index task | | DELETE | `/datasets/<id>/<index_type>` | Delete index | | GET | `/datasets/<id>` | Get dataset details | | GET | `/datasets/<id>/ingestions/summary` | Ingestion summary | | GET | `/datasets/<id>/ingestions` | List ingestion logs | | GET | `/datasets/<id>/ingestions/<log_id>` | Get single ingestion log | | POST | `/datasets/<id>/embedding` | Run embedding | | GET | `/datasets/<id>/tags` | List tags | | GET | `/datasets/tags/aggregation` | Aggregate tags across datasets | | DELETE | `/datasets/<id>/tags` | Delete tags | | PUT | `/datasets/<id>/tags` | Rename tag | | GET | `/datasets/metadata/flattened` | Get flattened metadata | | GET/PUT | `/datasets/<id>/metadata/config` | New metadata config path | **Removed routes (replaced by unified `/index`):** - `POST /datasets/<id>/mindmap` - `GET /datasets/<id>/mindmap` **Preserved legacy routes (backward compatibility):** - `/run_graphrag`, `/trace_graphrag`, `/run_raptor`, `/trace_raptor` - `/auto_metadata` GET/PUT ### Test Suite - Updated `common.py` helpers: added `trace_index`, removed `run_mindmap`/`trace_mindmap` - Added 7 new test files with 39 test cases total: | Test File | Cases | |-----------|-------| | `test_get_dataset.py` | 4 | | `test_ingestion_summary.py` | 2 | | `test_ingestion_logs.py` | 5 | | `test_index_api.py` | 14 | | `test_embedding.py` | 2 | | `test_tags.py` | 8 | | `test_flattened_metadata.py` | 4 | - Deleted `test_mindmap_tasks.py` (covered by unified index tests) ## Design Decisions 1. **Unified `/index?type=...`** — single endpoint replaces 3 separate route pairs for graph/raptor/mindmap 2. **Backward compatibility** — old routes (`/run_graphrag`, `/run_raptor`, `/auto_metadata`) preserved alongside new paths 3. **`_VALID_INDEX_TYPES = {"graph", "raptor", "mindmap"}`** — input validation via constant set 4. **`_INDEX_TYPE_TO_TASK_ID_FIELD`** — maps index type to KB model task ID field for clean dispatch ## Files Changed - `api/apps/restful_apis/dataset_api.py` - `api/apps/services/dataset_api_service.py` - `sdk/python/ragflow_sdk/modules/dataset.py` - `test/testcases/test_http_api/common.py` - `test/testcases/test_http_api/test_dataset_management/` (7 new files) ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Refactoring --------- Signed-off-by: noob <yixiao121314@outlook.com>
2026-04-27 01:38:01 +00:00
@manager.route("/datasets/<dataset_id>/documents/parse", methods=["POST"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def parse_documents(tenant_id, dataset_id):
"""
Start parsing documents in a dataset.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
- in: body
name: body
description: Document parse parameters.
required: true
schema:
type: object
properties:
document_ids:
type: array
items:
type: string
description: List of document IDs to parse.
responses:
200:
description: Successful operation.
"""
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
req = await get_request_json()
if req is None:
return get_error_data_result(message="Request body is required")
document_ids = req.get("document_ids")
if document_ids is None or not isinstance(document_ids, list):
return get_error_data_result(message="`document_ids` is required")
if len(document_ids) == 0:
return get_error_data_result(message="`document_ids` is required")
# Check for duplicate document IDs
unique_doc_ids, duplicate_messages = check_duplicate_ids(document_ids, "document")
errors = duplicate_messages if duplicate_messages else []
# Validate all document IDs belong to the dataset
not_found_ids = []
valid_doc_ids = []
for doc_id in unique_doc_ids:
docs = DocumentService.query(kb_id=dataset_id, id=doc_id)
if not docs:
not_found_ids.append(doc_id)
else:
valid_doc_ids.append(doc_id)
if not_found_ids:
errors.append(f"Documents not found: {not_found_ids}")
# Still parse valid documents, but return error code
if not valid_doc_ids:
return get_error_data_result(message=f"Documents not found: {not_found_ids}")
try:
def _run_sync():
kb_table_num_map = {}
success_count = 0
for doc_id in valid_doc_ids:
e, doc = DocumentService.get_by_id(doc_id)
if not e:
errors.append(f"Document not found: {doc_id}")
continue
info = {"run": str(TaskStatus.RUNNING.value), "progress": 0}
# If re-running a completed document, clear previous chunks
if str(doc.run) == TaskStatus.DONE.value:
DocumentService.clear_chunk_num_when_rerun(doc.id)
info["progress_msg"] = ""
info["chunk_num"] = 0
info["token_num"] = 0
DocumentService.update_by_id(doc_id, info)
TaskService.filter_delete([Task.doc_id == doc_id])
if settings.docStoreConn.index_exist(search.index_name(tenant_id), doc.kb_id):
settings.docStoreConn.delete({"doc_id": doc_id}, search.index_name(tenant_id), doc.kb_id)
doc_dict = doc.to_dict()
DocumentService.run(tenant_id, doc_dict, kb_table_num_map)
success_count += 1
result = {"success_count": success_count}
if errors:
result["errors"] = errors
return result
result = await thread_pool_exec(_run_sync)
if not_found_ids:
return get_error_data_result(message=f"Documents not found: {not_found_ids}")
return get_result(data=result)
except Exception as e:
logging.exception(e)
return get_error_data_result(message="Internal server error")
@manager.route("/datasets/<dataset_id>/documents/stop", methods=["POST"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def stop_parse_documents(tenant_id, dataset_id):
"""
Stop parsing documents in a dataset.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
- in: body
name: body
description: Document stop parse parameters.
required: true
schema:
type: object
properties:
document_ids:
type: array
items:
type: string
description: List of document IDs to stop parsing.
responses:
200:
description: Successful operation.
"""
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
req = await get_request_json()
if req is None:
return get_error_data_result(message="Request body is required")
document_ids = req.get("document_ids")
if document_ids is None or not isinstance(document_ids, list):
return get_error_data_result(message="`document_ids` is required")
if len(document_ids) == 0:
return get_error_data_result(message="`document_ids` is required")
# Check for duplicate document IDs
unique_doc_ids, duplicate_messages = check_duplicate_ids(document_ids, "document")
errors = duplicate_messages if duplicate_messages else []
# Validate all document IDs belong to the dataset
not_found_ids = []
valid_doc_ids = []
for doc_id in unique_doc_ids:
docs = DocumentService.query(kb_id=dataset_id, id=doc_id)
if not docs:
not_found_ids.append(doc_id)
else:
valid_doc_ids.append(doc_id)
if not_found_ids:
return get_error_data_result(message=f"Documents not found: {not_found_ids}")
try:
def _run_sync():
success_count = 0
for doc_id in valid_doc_ids:
e, doc = DocumentService.get_by_id(doc_id)
if not e:
errors.append(f"Document not found: {doc_id}")
continue
# Check if the document is currently running
tasks = list(TaskService.query(doc_id=doc_id))
has_unfinished_task = any((task.progress or 0) < 1 for task in tasks)
if str(doc.run) not in [TaskStatus.RUNNING.value, TaskStatus.CANCEL.value] and not has_unfinished_task:
errors.append("Can't stop parsing document that has not started or already completed")
continue
cancel_all_task_of(doc_id)
fix(api): POST /documents/stop removes partial chunks and resets counters (#15789) ### What problem does this PR solve? `POST /api/v1/datasets/{dataset_id}/documents/stop` (`stop_parse_documents`) cancels parsing tasks and sets `run` to `CANCEL`, but it does **not** remove chunks already indexed in the doc store or reset `progress` / `chunk_num`. REST callers can end up with a “cancelled” document that still returns partial chunks in `GET .../chunks` and in retrieval. Legacy `DELETE /api/v1/datasets/{dataset_id}/chunks` (`stop_parsing`) already performs full cleanup: it resets counters and calls `docStoreConn.delete`. This PR aligns the newer stop endpoint with that behavior so both paths leave the dataset consistent. Fixes [#15788](https://github.com/infiniflow/ragflow/issues/15788). ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [ ] New Feature (non-breaking change which adds functionality) - [ ] Documentation Update - [ ] Refactoring - [ ] Performance Improvement - [ ] Other (please describe): ### Changes - Update `stop_parse_documents` in `document_api.py` to reset `progress` and `chunk_num` to `0` and delete partial chunks via `docStoreConn.delete` after `cancel_all_task_of`. - Add unit test `test_stop_parse_documents_cleans_partial_chunks` to assert counters reset and doc store delete is invoked. ### Test plan - [x] Unit test: `pytest test/testcases/test_http_api/test_file_management_within_dataset/test_doc_sdk_routes_unit.py::TestDocRoutesUnit::test_stop_parse_documents_cleans_partial_chunks -v` - [ ] Manual: upload a slow document, start parse, call `POST .../documents/stop` while `RUNNING`, verify `GET .../chunks` returns zero chunks and UI `chunk_count` is 0 - [ ] Control: legacy `DELETE .../chunks` behavior unchanged --------- Co-authored-by: Wang Qi <wangq8@outlook.com>
2026-06-11 00:51:32 -07:00
DocumentService.update_by_id(
doc_id,
{
"run": str(TaskStatus.CANCEL.value),
"progress": 0,
"chunk_num": 0,
},
)
index_name = search.index_name(tenant_id)
if settings.docStoreConn.index_exist(index_name, doc.kb_id):
settings.docStoreConn.delete({"doc_id": doc.id}, index_name, doc.kb_id)
feat(api): add unified index API and dataset management endpoints (#14222) ### What problem does this PR solve? ## Summary Refactor the dataset API layer into a clean service/REST separation pattern, add a unified `/index` API for graph/raptor/mindmap operations, and introduce several new dataset management endpoints with full test coverage. ## Changes ### Service Layer (`dataset_api_service.py`) - Added `trace_index(dataset_id, tenant_id, index_type)` — unified trace function for all index types - Added `run_index`, `delete_index` service functions - Added `get_dataset`, `get_ingestion_summary`, `list_ingestion_logs`, `get_ingestion_log` - Added `run_embedding`, `list_tags`, `aggregate_tags`, `delete_tags`, `rename_tag` - Added `get_flattened_metadata`, `get_auto_metadata`, `update_auto_metadata` ### REST API Layer (`dataset_api.py`) **New unified routes:** | Method | Route | Description | |--------|-------|-------------| | POST | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Run index task | | GET | `/datasets/<id>/index?type=graph\|raptor\|mindmap` | Trace index task | | DELETE | `/datasets/<id>/<index_type>` | Delete index | | GET | `/datasets/<id>` | Get dataset details | | GET | `/datasets/<id>/ingestions/summary` | Ingestion summary | | GET | `/datasets/<id>/ingestions` | List ingestion logs | | GET | `/datasets/<id>/ingestions/<log_id>` | Get single ingestion log | | POST | `/datasets/<id>/embedding` | Run embedding | | GET | `/datasets/<id>/tags` | List tags | | GET | `/datasets/tags/aggregation` | Aggregate tags across datasets | | DELETE | `/datasets/<id>/tags` | Delete tags | | PUT | `/datasets/<id>/tags` | Rename tag | | GET | `/datasets/metadata/flattened` | Get flattened metadata | | GET/PUT | `/datasets/<id>/metadata/config` | New metadata config path | **Removed routes (replaced by unified `/index`):** - `POST /datasets/<id>/mindmap` - `GET /datasets/<id>/mindmap` **Preserved legacy routes (backward compatibility):** - `/run_graphrag`, `/trace_graphrag`, `/run_raptor`, `/trace_raptor` - `/auto_metadata` GET/PUT ### Test Suite - Updated `common.py` helpers: added `trace_index`, removed `run_mindmap`/`trace_mindmap` - Added 7 new test files with 39 test cases total: | Test File | Cases | |-----------|-------| | `test_get_dataset.py` | 4 | | `test_ingestion_summary.py` | 2 | | `test_ingestion_logs.py` | 5 | | `test_index_api.py` | 14 | | `test_embedding.py` | 2 | | `test_tags.py` | 8 | | `test_flattened_metadata.py` | 4 | - Deleted `test_mindmap_tasks.py` (covered by unified index tests) ## Design Decisions 1. **Unified `/index?type=...`** — single endpoint replaces 3 separate route pairs for graph/raptor/mindmap 2. **Backward compatibility** — old routes (`/run_graphrag`, `/run_raptor`, `/auto_metadata`) preserved alongside new paths 3. **`_VALID_INDEX_TYPES = {"graph", "raptor", "mindmap"}`** — input validation via constant set 4. **`_INDEX_TYPE_TO_TASK_ID_FIELD`** — maps index type to KB model task ID field for clean dispatch ## Files Changed - `api/apps/restful_apis/dataset_api.py` - `api/apps/services/dataset_api_service.py` - `sdk/python/ragflow_sdk/modules/dataset.py` - `test/testcases/test_http_api/common.py` - `test/testcases/test_http_api/test_dataset_management/` (7 new files) ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Refactoring --------- Signed-off-by: noob <yixiao121314@outlook.com>
2026-04-27 01:38:01 +00:00
success_count += 1
result = {"success_count": success_count}
if errors:
result["errors"] = errors
return result
result = await thread_pool_exec(_run_sync)
if not_found_ids:
return get_error_data_result(message=f"Documents not found: {not_found_ids}")
return get_result(data=result)
except Exception as e:
logging.exception(e)
return get_error_data_result(message="Internal server error")
def _parse_document_image_id(image_id: str) -> tuple[str, str] | None:
"""Split a composite document image ID into storage bucket and object key.
Thumbnail URLs use ``{dataset_id}-{thumbnail}``. Only the first hyphen
separates the dataset/kb id (bucket) from the object key, which may
contain additional hyphens (e.g. ``page-1.png``).
Args:
image_id: Path segment from ``GET /documents/images/<image_id>``.
Returns:
``(bucket, object_key)`` when valid, otherwise ``None``.
"""
parts = image_id.split("-", 1)
if len(parts) != 2 or not parts[0] or not parts[1]:
return None
return parts[0], parts[1]
Infer Content-Type for document image endpoint (#15368) ## Summary Fixes [#15367](https://github.com/infiniflow/ragflow/issues/15367) — `GET /api/v1/documents/images/<image_id>` always returned `Content-Type: image/JPEG` even for PNG/WebP chunk images and extensioned thumbnails. ## Related Issue Fixes #15367 ## Change Type - [x] Bug fix - [x] Regression tests - [ ] New feature - [ ] Refactor ## What Changed - Added `_detect_image_content_type_from_bytes()` — PNG/JPEG/GIF/WebP/BMP magic-byte detection - Added `_content_type_for_document_image()` — object-key extension via `CONTENT_TYPE_MAP`, then magic bytes, else `application/octet-stream` - **`get_document_image()`** — set inferred `Content-Type` instead of hardcoded `image/JPEG` - Also guards missing storage blob (`Image not found.`) to avoid `make_response(None)` (same handler; complements #15365) ## Files Changed | File | Change | |------|--------| | `api/apps/restful_apis/document_api.py` | MIME inference helpers + handler update | | `test/testcases/test_web_api/test_document_app/test_document_metadata.py` | 3 unit tests | ## Validation ```bash cd /root/gittensor/ragflow pytest test/testcases/test_web_api/test_document_app/test_document_metadata.py::TestDocumentMetadataUnit::test_get_document_image_content_type_from_object_extension_unit -v pytest test/testcases/test_web_api/test_document_app/test_document_metadata.py::TestDocumentMetadataUnit::test_get_document_image_content_type_from_magic_bytes_unit -v pytest test/testcases/test_web_api/test_document_app/test_document_metadata.py::TestDocumentMetadataUnit::test_get_document_image_missing_blob_unit -v ``` ## Test Plan - [x] `.png` object key → `image/png` - [x] Extensionless chunk key + PNG bytes → `image/png` (magic bytes) - [x] Missing blob → 4xx `"Image not found."` - [ ] CI green
2026-06-01 04:08:32 -07:00
def _detect_image_content_type_from_bytes(data):
if data.startswith(b"\x89PNG\r\n\x1a\n"):
return "image/png"
if data[:3] == b"\xff\xd8\xff":
return "image/jpeg"
if data[:6] in (b"GIF87a", b"GIF89a"):
return "image/gif"
if len(data) >= 12 and data[:4] == b"RIFF" and data[8:12] == b"WEBP":
return "image/webp"
if data[:2] == b"BM":
return "image/bmp"
return None
def _content_type_for_document_image(object_name, data):
ext_match = re.search(r"\.([^.]+)$", object_name.lower())
if ext_match:
content_type = CONTENT_TYPE_MAP.get(ext_match.group(1))
if content_type and content_type.startswith("image/"):
return content_type
detected = _detect_image_content_type_from_bytes(data)
if detected:
return detected
return "application/octet-stream"
@manager.route("/documents/images/<image_id>", methods=["GET"]) # noqa: F821
@login_required(auth_types=[AUTH_JWT, AUTH_API, AUTH_BETA])
async def get_document_image(image_id):
"""
Get a document image by ID.
---
tags:
- Documents
parameters:
- name: image_id
in: path
required: true
schema:
type: string
description: Composite ID ``{dataset_id}-{thumbnail_object_key}`` (split on first hyphen only)
responses:
200:
description: Image file
content:
image/jpeg:
schema:
type: string
format: binary
"""
try:
parsed = _parse_document_image_id(image_id)
if not parsed:
return get_data_error_result(message="Image not found.")
bkt, nm = parsed
data = await thread_pool_exec(settings.STORAGE_IMPL.get, bkt, nm)
Infer Content-Type for document image endpoint (#15368) ## Summary Fixes [#15367](https://github.com/infiniflow/ragflow/issues/15367) — `GET /api/v1/documents/images/<image_id>` always returned `Content-Type: image/JPEG` even for PNG/WebP chunk images and extensioned thumbnails. ## Related Issue Fixes #15367 ## Change Type - [x] Bug fix - [x] Regression tests - [ ] New feature - [ ] Refactor ## What Changed - Added `_detect_image_content_type_from_bytes()` — PNG/JPEG/GIF/WebP/BMP magic-byte detection - Added `_content_type_for_document_image()` — object-key extension via `CONTENT_TYPE_MAP`, then magic bytes, else `application/octet-stream` - **`get_document_image()`** — set inferred `Content-Type` instead of hardcoded `image/JPEG` - Also guards missing storage blob (`Image not found.`) to avoid `make_response(None)` (same handler; complements #15365) ## Files Changed | File | Change | |------|--------| | `api/apps/restful_apis/document_api.py` | MIME inference helpers + handler update | | `test/testcases/test_web_api/test_document_app/test_document_metadata.py` | 3 unit tests | ## Validation ```bash cd /root/gittensor/ragflow pytest test/testcases/test_web_api/test_document_app/test_document_metadata.py::TestDocumentMetadataUnit::test_get_document_image_content_type_from_object_extension_unit -v pytest test/testcases/test_web_api/test_document_app/test_document_metadata.py::TestDocumentMetadataUnit::test_get_document_image_content_type_from_magic_bytes_unit -v pytest test/testcases/test_web_api/test_document_app/test_document_metadata.py::TestDocumentMetadataUnit::test_get_document_image_missing_blob_unit -v ``` ## Test Plan - [x] `.png` object key → `image/png` - [x] Extensionless chunk key + PNG bytes → `image/png` (magic bytes) - [x] Missing blob → 4xx `"Image not found."` - [ ] CI green
2026-06-01 04:08:32 -07:00
if not data:
return get_data_error_result(message="Image not found.")
content_type = _content_type_for_document_image(nm, data)
response = await make_response(data)
Infer Content-Type for document image endpoint (#15368) ## Summary Fixes [#15367](https://github.com/infiniflow/ragflow/issues/15367) — `GET /api/v1/documents/images/<image_id>` always returned `Content-Type: image/JPEG` even for PNG/WebP chunk images and extensioned thumbnails. ## Related Issue Fixes #15367 ## Change Type - [x] Bug fix - [x] Regression tests - [ ] New feature - [ ] Refactor ## What Changed - Added `_detect_image_content_type_from_bytes()` — PNG/JPEG/GIF/WebP/BMP magic-byte detection - Added `_content_type_for_document_image()` — object-key extension via `CONTENT_TYPE_MAP`, then magic bytes, else `application/octet-stream` - **`get_document_image()`** — set inferred `Content-Type` instead of hardcoded `image/JPEG` - Also guards missing storage blob (`Image not found.`) to avoid `make_response(None)` (same handler; complements #15365) ## Files Changed | File | Change | |------|--------| | `api/apps/restful_apis/document_api.py` | MIME inference helpers + handler update | | `test/testcases/test_web_api/test_document_app/test_document_metadata.py` | 3 unit tests | ## Validation ```bash cd /root/gittensor/ragflow pytest test/testcases/test_web_api/test_document_app/test_document_metadata.py::TestDocumentMetadataUnit::test_get_document_image_content_type_from_object_extension_unit -v pytest test/testcases/test_web_api/test_document_app/test_document_metadata.py::TestDocumentMetadataUnit::test_get_document_image_content_type_from_magic_bytes_unit -v pytest test/testcases/test_web_api/test_document_app/test_document_metadata.py::TestDocumentMetadataUnit::test_get_document_image_missing_blob_unit -v ``` ## Test Plan - [x] `.png` object key → `image/png` - [x] Extensionless chunk key + PNG bytes → `image/png` (magic bytes) - [x] Missing blob → 4xx `"Image not found."` - [ ] CI green
2026-06-01 04:08:32 -07:00
response.headers.set("Content-Type", content_type)
return response
except Exception as e:
return server_error_response(e)
ARTIFACT_CONTENT_TYPES = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".svg": "image/svg+xml",
".pdf": "application/pdf",
".csv": "text/csv",
".json": "application/json",
".html": "text/html",
}
@DB.connection_context()
def _sandbox_artifact_dialog_ids_for_user(filename: str, user_id: str) -> list[str]:
"""Return agent dialog IDs for sessions owned by *user_id* that reference *filename*."""
if not filename:
return []
artifact_ref = f"documents/artifact/{filename}"
rows = (
API4Conversation.select(API4Conversation.dialog_id)
.where(
((API4Conversation.user_id == user_id) | (API4Conversation.exp_user_id == user_id)),
(API4Conversation.message.contains(filename) | API4Conversation.message.contains(artifact_ref)),
)
.distinct()
)
return [row.dialog_id for row in rows if row.dialog_id]
def _sandbox_artifact_accessible(filename: str, user_id: str) -> bool:
"""True when a CodeExec sandbox artifact belongs to an agent session the user may access."""
for dialog_id in _sandbox_artifact_dialog_ids_for_user(filename, user_id):
if UserCanvasService.accessible(dialog_id, user_id):
return True
return False
@manager.route("/documents/artifact/<filename>", methods=["GET"]) # noqa: F821
@login_required
async def get_artifact(filename):
"""
Get an artifact file.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: filename
type: string
required: true
description: Name of the artifact file.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
responses:
200:
description: Artifact file returned successfully.
"""
from common import settings
try:
bucket = SANDBOX_ARTIFACT_BUCKET
# Validate filename: must be uuid hex + allowed extension, nothing else
basename = os.path.basename(filename)
if basename != filename or "/" in filename or "\\" in filename:
return get_data_error_result(message="Invalid filename.")
ext = os.path.splitext(basename)[1].lower()
if ext not in ARTIFACT_CONTENT_TYPES:
return get_data_error_result(message="Invalid file type.")
if not await thread_pool_exec(_sandbox_artifact_accessible, basename, current_user.id):
return get_data_error_result(message="Artifact not found.")
data = await thread_pool_exec(settings.STORAGE_IMPL.get, bucket, basename)
if not data:
return get_data_error_result(message="Artifact not found.")
content_type = ARTIFACT_CONTENT_TYPES.get(ext, "application/octet-stream")
response = await make_response(data)
safe_filename = re.sub(r"[^\w.\-]", "_", basename)
apply_safe_file_response_headers(response, content_type, ext)
if not response.headers.get("Content-Disposition"):
response.headers.set("Content-Disposition", f'inline; filename="{safe_filename}"')
return response
except Exception as e:
return server_error_response(e)
@manager.route("/datasets/<dataset_id>/documents/batch-update-status", methods=["POST"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def batch_update_document_status(tenant_id, dataset_id):
"""
Batch update status of documents within a dataset.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
- in: body
name: body
description: Document status update parameters.
required: true
schema:
type: object
required:
- doc_ids
- status
properties:
doc_ids:
type: array
items:
type: string
description: List of document IDs to update.
status:
type: string
enum: ["0", "1"]
description: New status (0 = disabled, 1 = enabled).
responses:
200:
description: Document statuses updated successfully.
"""
req = await get_request_json()
doc_ids = req.get("doc_ids", [])
if not isinstance(doc_ids, list) or not doc_ids:
return get_error_argument_result(message='"doc_ids" must be a non-empty list.')
if any(not isinstance(doc_id, str) or not doc_id for doc_id in doc_ids):
return get_error_argument_result(message='"doc_ids" must contain non-empty document IDs.')
status = str(req.get("status", -1))
if status not in ["0", "1"]:
return get_error_argument_result(message=f'"Status" must be either 0 or 1:{status}!')
# Verify dataset ownership
if not KnowledgebaseService.query(id=dataset_id, tenant_id=tenant_id):
return get_error_data_result(message="You don't own the dataset.")
e, kb = KnowledgebaseService.get_by_id(dataset_id)
if not e:
return get_error_data_result(message="Can't find this dataset!")
result = {}
has_error = False
for doc_id in doc_ids:
try:
e, doc = DocumentService.get_by_id(doc_id)
if not e:
result[doc_id] = {"error": "Document not found"}
has_error = True
continue
if doc.kb_id != dataset_id:
logging.warning(f"Document {doc.kb_id} not in dataset {dataset_id}")
result[doc_id] = {"error": "Document not found in this dataset."}
has_error = True
continue
current_status = str(doc.status)
if current_status == status:
result[doc_id] = {"status": status}
continue
if not DocumentService.update_by_id(doc_id, {"status": str(status)}):
result[doc_id] = {"error": "Database error (Document update)!"}
has_error = True
continue
status_int = int(status)
if getattr(doc, "chunk_num", 0) > 0:
try:
ok = settings.docStoreConn.update(
{"doc_id": doc_id},
{"available_int": status_int},
search.index_name(kb.tenant_id),
doc.kb_id,
)
except Exception as exc:
msg = str(exc)
if "3022" in msg:
result[doc_id] = {"error": "Document store table missing."}
else:
result[doc_id] = {"error": f"Document store update failed: {msg}"}
has_error = True
continue
if not ok:
result[doc_id] = {"error": "Database error (docStore update)!"}
has_error = True
continue
result[doc_id] = {"status": status}
except Exception as e:
result[doc_id] = {"error": f"Internal server error: {str(e)}"}
has_error = True
if has_error:
return get_json_result(data=result, message="Partial failure", code=RetCode.SERVER_ERROR)
return get_json_result(data=result)
@manager.route("/documents/<doc_id>/preview", methods=["GET"]) # noqa: F821
@login_required(auth_types=[AUTH_JWT, AUTH_API, AUTH_BETA])
async def get(doc_id):
"""Return the raw file bytes for a document the requesting user is authorized to read.
The user must belong to the tenant that owns the document's knowledge base; otherwise
the response is indistinguishable from a missing document to avoid cross-tenant ID
enumeration.
"""
try:
if not DocumentService.accessible(doc_id, current_user.id):
return get_data_error_result(message="Document not found!")
e, doc = DocumentService.get_by_id(doc_id)
if not e:
return get_data_error_result(message="Document not found!")
b, n = File2DocumentService.get_storage_address(doc_id=doc_id)
data = await thread_pool_exec(settings.STORAGE_IMPL.get, b, n)
if not data:
return get_data_error_result(message="This file is empty.")
response = await make_response(data)
ext = re.search(r"\.([^.]+)$", doc.name.lower())
ext = ext.group(1) if ext else None
content_type = None
if ext:
fallback_prefix = "image" if doc.type == FileType.VISUAL.value else "application"
content_type = CONTENT_TYPE_MAP.get(ext, f"{fallback_prefix}/{ext}")
apply_safe_file_response_headers(response, content_type, ext)
return response
except Exception as e:
return server_error_response(e)
def _mimetype_for_document(doc) -> str:
match = re.search(r"\.([^.]+)$", (doc.name or "").lower())
if not match:
return "application/octet-stream"
ext = match.group(1)
fallback_prefix = "image" if doc.type == FileType.VISUAL.value else "application"
return CONTENT_TYPE_MAP.get(ext, f"{fallback_prefix}/{ext}")
@manager.route("/datasets/<dataset_id>/documents/<document_id>", methods=["GET"]) # noqa: F821
@login_required
async def download(dataset_id, document_id):
"""
Download a document from a dataset.
---
tags:
- Documents
security:
- ApiKeyAuth: []
produces:
- application/octet-stream
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: path
name: document_id
type: string
required: true
description: ID of the document to download.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
responses:
200:
description: Document file stream.
schema:
type: file
400:
description: Error message.
schema:
type: object
"""
if not document_id:
return get_error_data_result(message="Specify document_id please.")
Harden closed-advisory fixes (#16409) ## Summary - harden reopened advisory fixes across REST connector, invoke, document downloads, and markdown rendering - add targeted regression coverage for redirect-safe SSRF handling, invoke SSRF checks, document access control, and markdown sanitization - verify each referenced GHSA against the original GitHub advisory text and align the closed-advisory plan with the implemented remediation ## What changed - add tenant access checks to document download endpoints to avoid cross-tenant document disclosure - add per-hop SSRF validation, DNS pinning, redirect handling, and redirect limits to the REST API connector - ensure invoke requests validate and pin the resolved host and never follow redirects implicitly - keep the generic rate-limited request path wrapped, not just GET and POST helpers - sanitize markdown HTML before rendering in the highlight markdown component ## Validation - `cd web && npm test -- --runInBand src/components/highlight-markdown/__tests__/index.test.tsx` - `.venv/bin/python -m pytest -q test/unit_test/data_source/test_rest_api_connector.py` - targeted `test/testcases/test_web_api/...` unit additions were reviewed, but the suite cannot be executed end-to-end in this environment because parent `test/testcases/conftest.py` requires a local service on `127.0.0.1:9380` ## Notes - all GHSA entries referenced by the plan were checked against the original GitHub advisory text, not sampled - the closed-advisory plan document was updated locally during review, but is intentionally not included in this PR
2026-06-28 11:17:54 +08:00
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=current_user.id):
return get_data_error_result(message="Document not found!")
if not DocumentService.accessible(document_id, current_user.id):
return get_data_error_result(message="Document not found!")
doc = DocumentService.query(kb_id=dataset_id, id=document_id)
if not doc:
return get_error_data_result(message=f"The dataset not own the document {document_id}.")
# The process of downloading
doc_id, doc_location = File2DocumentService.get_storage_address(doc_id=document_id) # minio address
file_stream = settings.STORAGE_IMPL.get(doc_id, doc_location)
if not file_stream:
return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR)
file = BytesIO(file_stream)
# Use send_file with a proper filename and MIME type
return await send_file(
file,
as_attachment=True,
attachment_filename=doc[0].name,
mimetype=_mimetype_for_document(doc[0]),
)
@manager.route("/documents/<document_id>", methods=["GET"]) # noqa: F821
@login_required
async def download_document(document_id):
"""
Download a document.
---
tags:
- Documents
security:
- ApiKeyAuth: []
produces:
- application/octet-stream
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: path
name: document_id
type: string
required: true
description: ID of the document to download.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
responses:
200:
description: Document file stream.
schema:
type: file
400:
description: Error message.
schema:
type: object
"""
if not document_id:
return get_error_data_result(message="Specify document_id please.")
Harden closed-advisory fixes (#16409) ## Summary - harden reopened advisory fixes across REST connector, invoke, document downloads, and markdown rendering - add targeted regression coverage for redirect-safe SSRF handling, invoke SSRF checks, document access control, and markdown sanitization - verify each referenced GHSA against the original GitHub advisory text and align the closed-advisory plan with the implemented remediation ## What changed - add tenant access checks to document download endpoints to avoid cross-tenant document disclosure - add per-hop SSRF validation, DNS pinning, redirect handling, and redirect limits to the REST API connector - ensure invoke requests validate and pin the resolved host and never follow redirects implicitly - keep the generic rate-limited request path wrapped, not just GET and POST helpers - sanitize markdown HTML before rendering in the highlight markdown component ## Validation - `cd web && npm test -- --runInBand src/components/highlight-markdown/__tests__/index.test.tsx` - `.venv/bin/python -m pytest -q test/unit_test/data_source/test_rest_api_connector.py` - targeted `test/testcases/test_web_api/...` unit additions were reviewed, but the suite cannot be executed end-to-end in this environment because parent `test/testcases/conftest.py` requires a local service on `127.0.0.1:9380` ## Notes - all GHSA entries referenced by the plan were checked against the original GitHub advisory text, not sampled - the closed-advisory plan document was updated locally during review, but is intentionally not included in this PR
2026-06-28 11:17:54 +08:00
if not DocumentService.accessible(document_id, current_user.id):
return get_data_error_result(message="Document not found!")
doc = DocumentService.query(id=document_id)
if not doc:
return get_error_data_result(message=f"The dataset not own the document {document_id}.")
# The process of downloading
doc_id, doc_location = File2DocumentService.get_storage_address(doc_id=document_id) # minio address
file_stream = settings.STORAGE_IMPL.get(doc_id, doc_location)
if not file_stream:
return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR)
file = BytesIO(file_stream)
# Use send_file with a proper filename and MIME type
return await send_file(
file,
as_attachment=True,
attachment_filename=doc[0].name,
mimetype=_mimetype_for_document(doc[0]),
)