Files
ragflow/api/apps/restful_apis/chunk_api.py
Kevin Hu 62f94cd59b Feat: Add knowledge compilation workflows (#16515)
## Summary
- Add knowledge compilation template APIs, services, and builtin
template seed data
- Add advanced knowledge compile structure/artifact/RAPTOR workflow
support
- Update parsing, dataset/document APIs, and supporting services for
compilation workflows
2026-07-02 23:22:07 +08:00

1111 lines
48 KiB
Python

#
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import binascii
import datetime
import json
import logging
import re
import xxhash
from pydantic import BaseModel, Field, validator
from quart import request
from api.apps import login_required
from api.db.joint_services.tenant_model_service import (
split_model_name,
get_model_config_from_provider_instance,
get_tenant_default_model_by_type,
)
from api.db.db_models import Document, Task
from api.db.services.doc_metadata_service import DocMetadataService
from api.db.services.document_service import DocumentService
from api.db.services.file2document_service import File2DocumentService
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.services.llm_service import LLMBundle
from api.db.services.task_service import TaskService, cancel_all_task_of, queue_tasks
from api.db.services.tenant_llm_service import TenantLLMService
from api.utils.api_utils import (
add_tenant_id_to_kwargs,
check_duplicate_ids,
construct_json_result,
get_error_data_result,
get_request_json,
get_result,
server_error_response,
)
from api.utils.pagination_utils import validate_rest_api_page_size
from api.utils.image_utils import store_chunk_image
from api.utils.reference_metadata_utils import (
enrich_chunks_with_document_metadata,
resolve_reference_metadata_preferences,
)
from common import settings
from common.constants import LLMType, ParserType, RetCode, TaskStatus
from common.doc_store.doc_store_base import OrderByExpr
from common.metadata_utils import convert_conditions, meta_filter
from common.misc_utils import thread_pool_exec
from common.string_utils import is_content_empty, remove_redundant_spaces
from common.tag_feature_utils import validate_tag_features
from rag.app.tag import label_question
from rag.nlp import search
from rag.prompts.generator import cross_languages, keyword_extraction
DOC_STOP_PARSING_INVALID_STATE_MESSAGE = "Can't stop parsing document that has not started or already completed"
DOC_STOP_PARSING_INVALID_STATE_ERROR_CODE = "DOC_STOP_PARSING_INVALID_STATE"
def _decode_chunk_image_base64(image_base64):
if not isinstance(image_base64, str) or not image_base64.strip():
return None, "`image_base64` must be a non-empty string"
try:
image_binary = base64.b64decode(image_base64, validate=True)
except (binascii.Error, ValueError):
return None, "Invalid `image_base64`"
if not image_binary:
return None, "`image_base64` is empty"
return image_binary, None
def _store_chunk_image_or_error(dataset_id, chunk_id, image_binary):
try:
store_chunk_image(dataset_id, chunk_id, image_binary)
except Exception:
logging.exception(
"Failed to store chunk image. dataset_id=%s chunk_id=%s",
dataset_id,
chunk_id,
)
return "Failed to store chunk image"
return None
class Chunk(BaseModel):
id: str = ""
content: str = ""
document_id: str = ""
docnm_kwd: str = ""
important_keywords: list = Field(default_factory=list)
tag_kwd: list = Field(default_factory=list)
questions: list = Field(default_factory=list)
question_tks: str = ""
image_id: str = ""
available: bool = True
positions: list[list[int]] = Field(default_factory=list)
@validator("positions")
def validate_positions(cls, value):
for sublist in value:
if len(sublist) != 5:
raise ValueError("Each sublist in positions must have a length of 5")
return value
def _map_doc(doc):
key_mapping = {
"chunk_num": "chunk_count",
"kb_id": "dataset_id",
"token_num": "token_count",
"parser_id": "chunk_method",
}
run_mapping = {
"0": "UNSTART",
"1": "RUNNING",
"2": "CANCEL",
"3": "DONE",
"4": "FAIL",
}
renamed_doc = {}
for key, value in doc.to_dict().items():
renamed_doc[key_mapping.get(key, key)] = value
if key == "run":
renamed_doc["run"] = run_mapping.get(str(value))
return renamed_doc
def _get_query_id_list(args, name: str) -> list[str]:
values = args.getlist(name) if hasattr(args, "getlist") else [args.get(name)]
ids: list[str] = []
seen: set[str] = set()
for value in values:
for item in str(value or "").split(","):
item = item.strip()
if item and item not in seen:
ids.append(item)
seen.add(item)
return ids
def _strip_chunk_runtime_fields(chunk):
for name in [name for name in chunk.keys() if re.search(r"(_vec$|_sm_|_tks|_ltks)", name)]:
del chunk[name]
return chunk
def _get_dataset_tenant_id(dataset_id):
ok, kb = KnowledgebaseService.get_by_id(dataset_id)
if not ok:
return None
return kb.tenant_id
def _compilation_template_kind(kind) -> str:
if not isinstance(kind, str):
return ""
normalized = kind.strip().lower().replace("-", "_")
if normalized in {"pageindex", "page_index", "knowledge_graph"}:
return "timeline"
return normalized
def _resolve_reference_metadata(req: dict, search_config: dict | None = None):
return resolve_reference_metadata_preferences(req, search_config)
def _enrich_chunks_with_document_metadata(chunks: list[dict], metadata_fields=None) -> None:
enrich_chunks_with_document_metadata(chunks, metadata_fields)
@manager.route("/datasets/<dataset_id>/chunks", methods=["POST"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def parse(tenant_id, dataset_id):
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
dataset_tenant_id = _get_dataset_tenant_id(dataset_id)
if not dataset_tenant_id:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
e, kb = KnowledgebaseService.get_by_id(dataset_id)
if not e:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
if kb.pipeline_id:
return get_error_data_result(
message="Datasets configured with an ingestion pipeline cannot be parsed with `/datasets/{dataset_id}/chunks`. Use `/documents/ingest` instead.", code=RetCode.ARGUMENT_ERROR
)
req = await get_request_json()
if not req.get("document_ids"):
return get_error_data_result("`document_ids` is required")
doc_list = req.get("document_ids")
unique_doc_ids, duplicate_messages = check_duplicate_ids(doc_list, "document")
doc_list = unique_doc_ids
not_found = []
success_count = 0
for id in doc_list:
doc = DocumentService.query(id=id, kb_id=dataset_id)
if not doc:
not_found.append(id)
continue
if not doc:
return get_error_data_result(message=f"You don't own the document {id}.")
info = {"run": "1", "progress": 0, "progress_msg": "", "chunk_num": 0, "token_num": 0}
if (
DocumentService.filter_update(
[
Document.id == id,
((Document.run.is_null(True)) | (Document.run != TaskStatus.RUNNING.value)),
],
info,
)
== 0
):
return get_error_data_result("Can't parse document that is currently being processed")
index_name = search.index_name(dataset_tenant_id)
if settings.docStoreConn.index_exist(index_name, doc[0].kb_id):
settings.docStoreConn.delete({"doc_id": id}, index_name, doc[0].kb_id)
else:
logging.info(
"Skipping chunk delete during parse for doc %s: index %s/%s does not exist",
id,
index_name,
doc[0].kb_id,
)
TaskService.filter_delete([Task.doc_id == id])
e, doc = DocumentService.get_by_id(id)
doc = doc.to_dict()
doc["tenant_id"] = tenant_id
bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"])
queue_tasks(doc, bucket, name, 0)
success_count += 1
if not_found:
return get_result(message=f"Documents not found: {not_found}", code=RetCode.DATA_ERROR)
if duplicate_messages:
if success_count > 0:
return get_result(
message=f"Partially parsed {success_count} documents with {len(duplicate_messages)} errors",
data={"success_count": success_count, "errors": duplicate_messages},
)
else:
return get_error_data_result(message=";".join(duplicate_messages))
return get_result()
@manager.route("/datasets/<dataset_id>/chunks", methods=["DELETE"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def stop_parsing(tenant_id, dataset_id):
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
dataset_tenant_id = _get_dataset_tenant_id(dataset_id)
if not dataset_tenant_id:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
req = await get_request_json()
if not req.get("document_ids"):
return get_error_data_result("`document_ids` is required")
doc_list = req.get("document_ids")
unique_doc_ids, duplicate_messages = check_duplicate_ids(doc_list, "document")
doc_list = unique_doc_ids
success_count = 0
for id in doc_list:
doc = DocumentService.query(id=id, kb_id=dataset_id)
if not doc:
return get_error_data_result(message=f"You don't own the document {id}.")
if doc[0].run != TaskStatus.RUNNING.value:
return construct_json_result(
code=RetCode.DATA_ERROR,
message=DOC_STOP_PARSING_INVALID_STATE_MESSAGE,
data={"error_code": DOC_STOP_PARSING_INVALID_STATE_ERROR_CODE},
)
cancel_all_task_of(id)
info = {"run": "2", "progress": 0, "chunk_num": 0}
DocumentService.update_by_id(id, info)
index_name = search.index_name(dataset_tenant_id)
if settings.docStoreConn.index_exist(index_name, doc[0].kb_id):
settings.docStoreConn.delete({"doc_id": doc[0].id}, index_name, doc[0].kb_id)
else:
logging.info(
"Skipping chunk delete during stop_parsing for doc %s: index %s/%s does not exist",
doc[0].id,
index_name,
doc[0].kb_id,
)
success_count += 1
if duplicate_messages:
if success_count > 0:
return get_result(
message=f"Partially stopped {success_count} documents with {len(duplicate_messages)} errors",
data={"success_count": success_count, "errors": duplicate_messages},
)
else:
return get_error_data_result(message=";".join(duplicate_messages))
return get_result()
@manager.route("/retrieval", methods=["POST"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def retrieval_test(tenant_id):
req = await get_request_json()
if not req.get("dataset_ids"):
return get_error_data_result("`dataset_ids` is required.")
kb_ids = req["dataset_ids"]
if not isinstance(kb_ids, list):
return get_error_data_result("`dataset_ids` should be a list")
for id in kb_ids:
if not KnowledgebaseService.accessible(kb_id=id, user_id=tenant_id):
return get_error_data_result(f"You don't own the dataset {id}.")
kbs = KnowledgebaseService.get_by_ids(kb_ids)
embd_nms = list(set([split_model_name(kb.embd_id)[0] for kb in kbs]))
if len(embd_nms) != 1:
return get_result(message="Datasets use different embedding models.", code=RetCode.DATA_ERROR)
if "question" not in req:
return get_error_data_result("`question` is required.")
page = int(req.get("page", 1))
size = validate_rest_api_page_size(int(req.get("page_size", 30)))
question = req["question"].strip() if isinstance(req["question"], str) else req["question"]
if not question:
return get_result(data={"total": 0, "chunks": [], "doc_aggs": {}})
doc_ids = req.get("document_ids", [])
use_kg = req.get("use_kg", False)
toc_enhance = req.get("toc_enhance", False)
langs = req.get("cross_languages", [])
if not isinstance(doc_ids, list):
return get_error_data_result("`documents` should be a list")
if doc_ids:
doc_ids_list = KnowledgebaseService.list_documents_by_ids(kb_ids)
for doc_id in doc_ids:
if doc_id not in doc_ids_list:
return get_error_data_result(f"The datasets don't own the document {doc_id}")
if not doc_ids:
metadata_condition = req.get("metadata_condition")
if metadata_condition:
metas = DocMetadataService.get_flatted_meta_by_kbs(kb_ids)
doc_ids = meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and"))
if not doc_ids and metadata_condition.get("conditions"):
return get_result(data={"total": 0, "chunks": [], "doc_aggs": {}})
if metadata_condition and not doc_ids:
doc_ids = ["-999"]
else:
doc_ids = None
similarity_threshold = float(req.get("similarity_threshold", 0.2))
vector_similarity_weight = float(req.get("vector_similarity_weight", 0.3))
top = int(req.get("top_k", 1024))
if top <= 0:
return get_error_data_result("`top_k` must be greater than 0")
highlight_val = req.get("highlight", None)
if highlight_val is None:
highlight = False
elif isinstance(highlight_val, bool):
highlight = highlight_val
elif isinstance(highlight_val, str) and highlight_val.lower() in ["true", "false"]:
highlight = highlight_val.lower() == "true"
else:
return get_error_data_result("`highlight` should be a boolean")
include_metadata, metadata_fields = _resolve_reference_metadata(req)
try:
tenant_ids = list(set([kb.tenant_id for kb in kbs]))
e, kb = KnowledgebaseService.get_by_id(kb_ids[0])
if not e:
return get_error_data_result(message="Dataset not found!")
embd_model_config = get_model_config_from_provider_instance(kb.tenant_id, LLMType.EMBEDDING, kb.embd_id)
embd_mdl = LLMBundle(kb.tenant_id, embd_model_config)
rerank_mdl = None
if req.get("rerank_id"):
rerank_model_config = get_model_config_from_provider_instance(kb.tenant_id, LLMType.RERANK, req["rerank_id"])
rerank_mdl = LLMBundle(kb.tenant_id, rerank_model_config)
if langs:
question = await cross_languages(kb.tenant_id, None, question, langs)
if req.get("keyword", False):
chat_model_config = get_tenant_default_model_by_type(kb.tenant_id, LLMType.CHAT)
question += await keyword_extraction(LLMBundle(kb.tenant_id, chat_model_config), question)
ranks = await settings.retriever.retrieval(
question,
embd_mdl,
tenant_ids,
kb_ids,
page,
size,
similarity_threshold,
vector_similarity_weight,
top,
doc_ids,
rerank_mdl=rerank_mdl,
highlight=highlight,
rank_feature=label_question(question, kbs),
)
if toc_enhance:
chat_model_config = get_tenant_default_model_by_type(kb.tenant_id, LLMType.CHAT)
cks = await settings.retriever.retrieval_by_toc(question, ranks["chunks"], tenant_ids, LLMBundle(kb.tenant_id, chat_model_config), size)
if cks:
ranks["chunks"] = cks
ranks["chunks"] = settings.retriever.retrieval_by_children(ranks["chunks"], tenant_ids)
if use_kg:
chat_model_config = get_tenant_default_model_by_type(kb.tenant_id, LLMType.CHAT)
ck = await settings.kg_retriever.retrieval(question, [k.tenant_id for k in kbs], kb_ids, embd_mdl, LLMBundle(kb.tenant_id, chat_model_config))
if ck["content_with_weight"]:
ranks["chunks"].insert(0, ck)
for c in ranks["chunks"]:
c.pop("vector", None)
if include_metadata:
logging.info("sdk.retrieval reference_metadata enabled dataset_ids=%s fields=%s chunks=%s", kb_ids, sorted(metadata_fields) if metadata_fields else None, len(ranks["chunks"]))
enrich_chunks_with_document_metadata(ranks["chunks"], metadata_fields)
key_mapping = {
"chunk_id": "id",
"content_with_weight": "content",
"doc_id": "document_id",
"important_kwd": "important_keywords",
"question_kwd": "questions",
"docnm_kwd": "document_keyword",
"kb_id": "dataset_id",
}
ranks["chunks"] = [{key_mapping.get(key, key): value for key, value in chunk.items()} for chunk in ranks["chunks"]]
return get_result(data=ranks)
except Exception as e:
if "not_found" in str(e):
return get_result(message="No chunk found! Check the chunk status please!", code=RetCode.DATA_ERROR)
return server_error_response(e)
@manager.route("/datasets/<dataset_id>/documents/<document_id>/chunks", methods=["GET"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def list_chunks(tenant_id, dataset_id, document_id):
from rag.nlp import search
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
dataset_tenant_id = _get_dataset_tenant_id(dataset_id)
if not dataset_tenant_id:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
doc = DocumentService.query(id=document_id, kb_id=dataset_id)
if not doc:
return get_error_data_result(message=f"You don't own the document {document_id}.")
doc = doc[0]
req = request.args
page = int(req.get("page", 1))
size = validate_rest_api_page_size(int(req.get("page_size", 30)))
question = req.get("keywords", "")
chunk_ids = _get_query_id_list(req, "chunk_ids")
query = {
"doc_ids": [document_id],
"page": page,
"size": size,
"question": question,
"sort": True,
"must_not": {"exists": "compile_kwd"},
}
if chunk_ids:
query["id"] = chunk_ids
if "available" in req:
query["available_int"] = 1 if req["available"] == "true" else 0
res = {"total": 0, "chunks": [], "doc": _map_doc(doc)}
if req.get("id"):
chunk = settings.docStoreConn.get(req.get("id"), search.index_name(dataset_tenant_id), [dataset_id])
if not chunk:
return get_result(message=f"Chunk not found: {dataset_id}/{req.get('id')}", code=RetCode.DATA_ERROR)
if str(chunk.get("doc_id", chunk.get("document_id"))) != str(document_id):
return get_result(message=f"Chunk not found: {dataset_id}/{req.get('id')}", code=RetCode.DATA_ERROR)
if chunk.get("compile_kwd"):
return get_result(message=f"Chunk not found: {dataset_id}/{req.get('id')}", code=RetCode.DATA_ERROR)
_strip_chunk_runtime_fields(chunk)
res["total"] = 1
final_chunk = {
"id": chunk.get("id", chunk.get("chunk_id")),
"content": chunk["content_with_weight"],
"document_id": chunk.get("doc_id", chunk.get("document_id")),
"docnm_kwd": chunk["docnm_kwd"],
"important_keywords": chunk.get("important_kwd", []),
"questions": chunk.get("question_kwd", []),
"dataset_id": chunk.get("kb_id", chunk.get("dataset_id")),
"image_id": chunk.get("img_id", ""),
"available": bool(chunk.get("available_int", 1)),
"positions": chunk.get("position_int", []),
"tag_kwd": chunk.get("tag_kwd", []),
"tag_feas": chunk.get("tag_feas", {}),
}
res["chunks"].append(final_chunk)
_ = Chunk(**final_chunk)
elif settings.docStoreConn.index_exist(search.index_name(dataset_tenant_id), dataset_id):
sres = await settings.retriever.search(
query,
search.index_name(dataset_tenant_id),
[dataset_id],
emb_mdl=None,
highlight=True,
)
res["total"] = sres.total
for chunk_id in sres.ids:
d = {
"id": chunk_id,
"content": (remove_redundant_spaces(sres.highlight[chunk_id]) if question and chunk_id in sres.highlight else sres.field[chunk_id].get("content_with_weight", "")),
"document_id": sres.field[chunk_id]["doc_id"],
"docnm_kwd": sres.field[chunk_id]["docnm_kwd"],
"important_keywords": sres.field[chunk_id].get("important_kwd", []),
"tag_kwd": sres.field[chunk_id].get("tag_kwd", []),
"questions": sres.field[chunk_id].get("question_kwd", []),
"dataset_id": sres.field[chunk_id].get("kb_id", sres.field[chunk_id].get("dataset_id")),
"image_id": sres.field[chunk_id].get("img_id", ""),
"available": bool(int(sres.field[chunk_id].get("available_int", "1"))),
"positions": sres.field[chunk_id].get("position_int", []),
}
res["chunks"].append(d)
_ = Chunk(**d)
return get_result(data=res)
@manager.route("/datasets/<dataset_id>/documents/<document_id>/chunks/<chunk_id>", methods=["GET"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def get_chunk(tenant_id, dataset_id, document_id, chunk_id):
from rag.nlp import search
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
dataset_tenant_id = _get_dataset_tenant_id(dataset_id)
if not dataset_tenant_id:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
doc = DocumentService.query(id=document_id, kb_id=dataset_id)
if not doc:
return get_error_data_result(message=f"You don't own the document {document_id}.")
try:
chunk = settings.docStoreConn.get(chunk_id, search.index_name(dataset_tenant_id), [dataset_id])
if chunk is None or str(chunk.get("doc_id", chunk.get("document_id"))) != str(document_id):
return get_result(data=False, message="Chunk not found!", code=RetCode.DATA_ERROR)
if chunk.get("compile_kwd"):
return get_result(data=False, message="Chunk not found!", code=RetCode.DATA_ERROR)
return get_result(data=_strip_chunk_runtime_fields(chunk))
except Exception as e:
if str(e).find("NotFoundError") >= 0:
return get_result(data=False, message="Chunk not found!", code=RetCode.DATA_ERROR)
return server_error_response(e)
@manager.route("/datasets/<dataset_id>/documents/<document_id>/structure/graph", methods=["GET"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def get_document_structure_graph(tenant_id, dataset_id, document_id):
"""Return per-template structure graphs for a document.
Response shape::
{
"templates": [
{
"template_id": "<id> | 'legacy:<compile_kwd>'",
"template_name": "<display name>",
"kind": "list | set | hypergraph | timeline | page_index | …",
"entities": [...],
"relations": [...]
},
...
]
}
Rows that pre-date the ``compilation_template_ids`` stamp are surfaced
under a synthetic ``legacy:<compile_kwd>`` bucket so an in-flight
migration doesn't drop their data on the floor. Empty templates
(zero entities AND zero relations) are filtered out.
"""
from rag.nlp import search
from api.db.services.compilation_template_group_service import CompilationTemplateGroupService
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
dataset_tenant_id = _get_dataset_tenant_id(dataset_id)
if not dataset_tenant_id:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
docs = DocumentService.query(id=document_id, kb_id=dataset_id)
if not docs:
return get_error_data_result(message=f"You don't own the document {document_id}.")
# Resolve the doc's configured template group → child template ids
# so we can render tabs in the order the user picked them.
# Artifacts-kind templates render on the dataset Artifact tab, not
# here, so they're filtered out.
parser_config = docs[0].parser_config or {}
def _group_ids(raw) -> list[str]:
if isinstance(raw, str):
raw = [raw]
if not isinstance(raw, list):
return []
ids: list[str] = []
seen: set[str] = set()
for gid in raw:
if not isinstance(gid, str):
continue
gid = gid.strip()
if gid and gid not in seen:
seen.add(gid)
ids.append(gid)
return ids
group_ids: list[str] = []
if isinstance(parser_config, dict):
if "compilation_template_group_id" in parser_config:
group_ids = _group_ids(parser_config.get("compilation_template_group_id"))
elif isinstance(parser_config.get("ext"), dict):
group_ids = _group_ids(parser_config["ext"].get("compilation_template_group_id"))
configured_ids: list[str] = []
seen_configured_ids: set[str] = set()
template_meta: dict[str, dict] = {}
template_meta_by_kind: dict[str, list[dict]] = {}
for group_id in group_ids:
group = CompilationTemplateGroupService.get_saved(group_id, tenant_id)
if not group:
continue
for template in group.get("templates") or []:
if not isinstance(template, dict):
continue
template_id = str(template.get("id") or "").strip()
if not template_id or template_id in seen_configured_ids:
continue
config = template.get("config") if isinstance(template.get("config"), dict) else {}
raw_kind = (config.get("kind") if isinstance(config, dict) else "") or template.get("kind") or ""
kind_norm = _compilation_template_kind(raw_kind)
if kind_norm == "artifacts":
continue
seen_configured_ids.add(template_id)
configured_ids.append(template_id)
meta = {
"template_id": template_id,
"template_name": template.get("name") or template_id,
"kind": raw_kind or kind_norm,
"kind_norm": kind_norm,
}
template_meta[template_id] = meta
template_meta_by_kind.setdefault(kind_norm, []).append(meta)
# Load every graph row for this doc in one shot. Each row corresponds
# to one (compile_kwd, template_id) tuple — written by
# ``_struct_upsert_graph_json``.
index_name = search.index_name(dataset_tenant_id)
fields = [
"content_with_weight",
"compile_kwd",
"compilation_template_ids",
"compilation_template_kind_kwd",
]
try:
res = await thread_pool_exec(
settings.docStoreConn.search,
fields,
[],
{"doc_id": [document_id], "knowledge_graph_kwd": ["graph"]},
[],
OrderByExpr(),
0,
1000,
index_name,
[dataset_id],
)
rows = settings.docStoreConn.get_fields(res, fields)
# The RAPTOR graph row is identified by ``compile_kwd``
# alone — it intentionally doesn't carry ``knowledge_graph_kwd``
# (which belongs to the KG feature). Query it separately and
# union into the same bucket map below.
res_raptor = await thread_pool_exec(
settings.docStoreConn.search,
fields,
[],
{"doc_id": [document_id], "compile_kwd": ["raptor_graph"]},
[],
OrderByExpr(),
0,
16,
index_name,
[dataset_id],
)
raptor_rows = settings.docStoreConn.get_fields(res_raptor, fields)
except Exception as e:
return server_error_response(e)
# Merge the two field-maps so the grouping loop below treats them
# identically. Raptor rows clobber by id, which is fine — both
# sources produce stable per-row ids.
if raptor_rows:
rows = dict(rows or {})
rows.update(raptor_rows)
def _row_template_id(row: dict) -> str | None:
raw = row.get("compilation_template_ids")
if isinstance(raw, list):
for v in raw:
if isinstance(v, str) and v.strip():
return v.strip()
if isinstance(raw, str) and raw.strip():
return raw.strip()
return None
# Group: template_id → {entities, relations, kind}
grouped: dict[str, dict] = {}
for row in (rows or {}).values():
graph = {}
try:
graph = json.loads(row.get("content_with_weight") or "{}")
except Exception:
continue
if not isinstance(graph, dict):
continue
entities = graph.get("entities") or []
relations = graph.get("relations") or []
if not entities and not relations:
continue
tid = _row_template_id(row)
compile_kwd_val = row.get("compile_kwd") or ""
kind_val = row.get("compilation_template_kind_kwd") or compile_kwd_val
# The RAPTOR graph row has no ``compilation_template_ids`` (it
# isn't derived from a user-authored template). Treat it as its
# own first-class bucket, not a legacy fallback.
is_raptor = compile_kwd_val == "raptor_graph"
if tid:
bucket_id = tid
row_kind_norm = _compilation_template_kind(kind_val)
meta = template_meta.get(bucket_id)
if not meta:
kind_matches = template_meta_by_kind.get(row_kind_norm) or []
if len(kind_matches) == 1:
meta = kind_matches[0]
bucket_name = (meta or {}).get("template_name") or bucket_id
bucket_kind = (meta or {}).get("kind") or kind_val
elif is_raptor:
bucket_id = "raptor"
bucket_name = "RAPTOR Summary"
bucket_kind = "raptor"
else:
# Legacy row: synthesize a stable id keyed by compile_kwd so
# multiple legacy kinds (e.g. ``list`` + ``hypergraph``) on
# the same doc surface as separate tabs.
bucket_id = f"legacy:{compile_kwd_val}"
bucket_name = f"Legacy ({compile_kwd_val})"
bucket_kind = kind_val
if bucket_id not in grouped:
grouped[bucket_id] = {
"template_id": bucket_id,
"template_name": bucket_name,
"kind": bucket_kind,
"entities": [],
"relations": [],
}
grouped[bucket_id]["entities"].extend(entities)
grouped[bucket_id]["relations"].extend(relations)
# Order: configured templates first (in the user's chosen order),
# then any legacy buckets after.
ordered_ids: list[str] = []
for tid in configured_ids:
if tid in grouped and tid not in ordered_ids:
ordered_ids.append(tid)
for bucket_id in grouped.keys():
if bucket_id not in ordered_ids:
ordered_ids.append(bucket_id)
templates_out = [grouped[bid] for bid in ordered_ids if grouped[bid]["entities"] or grouped[bid]["relations"]]
return get_result(data={"templates": templates_out})
@manager.route("/datasets/<dataset_id>/documents/<document_id>/structure/graph", methods=["DELETE"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def delete_document_structure_graph(tenant_id, dataset_id, document_id):
"""Delete one structure-graph tab for a document.
Request body::
{"template_id": "<template id> | legacy:<compile_kwd> | raptor"}
Template-backed structure tabs remove both the compact graph row and
the underlying entity/relation rows. RAPTOR only removes the graph
projection row so summary chunks remain available for retrieval.
"""
from rag.nlp import search
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
dataset_tenant_id = _get_dataset_tenant_id(dataset_id)
if not dataset_tenant_id:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
docs = DocumentService.query(id=document_id, kb_id=dataset_id)
if not docs:
return get_error_data_result(message=f"You don't own the document {document_id}.")
req = await get_request_json()
template_id = str(req.get("template_id") or "").strip()
if not template_id:
return get_error_data_result(message="`template_id` is required")
index_name = search.index_name(dataset_tenant_id)
def _delete(condition: dict) -> int:
return settings.docStoreConn.delete(condition, index_name, dataset_id)
try:
deleted = 0
if template_id == "raptor":
deleted += _delete({"doc_id": [document_id], "compile_kwd": ["raptor_graph"]})
return get_result(data={"deleted": deleted}, message=f"deleted {deleted} structure graph rows")
if template_id.startswith("legacy:"):
compile_kwd = template_id[len("legacy:") :].strip()
if not compile_kwd:
return get_error_data_result(message="`template_id` is invalid")
base_condition = {"doc_id": [document_id], "compile_kwd": [compile_kwd]}
else:
base_condition = {"doc_id": [document_id], "compilation_template_ids": [template_id]}
deleted += _delete({**base_condition, "knowledge_graph_kwd": ["graph"]})
deleted += _delete({**base_condition, "knowledge_graph_kwd": ["entity", "relation"]})
return get_result(data={"deleted": deleted}, message=f"deleted {deleted} structure graph rows")
except Exception as e:
return server_error_response(e)
@manager.route("/datasets/<dataset_id>/documents/<document_id>/chunks", methods=["POST"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def add_chunk(tenant_id, dataset_id, document_id):
from rag.nlp import rag_tokenizer, search
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
dataset_tenant_id = _get_dataset_tenant_id(dataset_id)
if not dataset_tenant_id:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
doc = DocumentService.query(id=document_id, kb_id=dataset_id)
if not doc:
return get_error_data_result(message=f"You don't own the document {document_id}.")
doc = doc[0]
req = await get_request_json()
if is_content_empty(req.get("content")):
return get_error_data_result(message="`content` is required")
if "important_keywords" in req and not isinstance(req["important_keywords"], list):
return get_error_data_result("`important_keywords` is required to be a list")
if "questions" in req and not isinstance(req["questions"], list):
return get_error_data_result("`questions` is required to be a list")
chunk_id = xxhash.xxh64((req["content"] + document_id).encode("utf-8")).hexdigest()
d = {
"id": chunk_id,
"content_ltks": rag_tokenizer.tokenize(req["content"]),
"content_with_weight": req["content"],
}
d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"])
d["important_kwd"] = req.get("important_keywords", [])
d["important_tks"] = rag_tokenizer.tokenize(" ".join(req.get("important_keywords", [])))
d["question_kwd"] = [str(q).strip() for q in req.get("questions", []) if str(q).strip()]
d["question_tks"] = rag_tokenizer.tokenize("\n".join(req.get("questions", [])))
d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19]
d["create_timestamp_flt"] = datetime.datetime.now().timestamp()
d["kb_id"] = dataset_id
d["docnm_kwd"] = doc.name
d["doc_id"] = document_id
if "tag_kwd" in req:
if not isinstance(req["tag_kwd"], list):
return get_error_data_result("`tag_kwd` is required to be a list")
if not all(isinstance(t, str) for t in req["tag_kwd"]):
return get_error_data_result("`tag_kwd` must be a list of strings")
d["tag_kwd"] = req["tag_kwd"]
if "tag_feas" in req:
try:
d["tag_feas"] = validate_tag_features(req["tag_feas"])
except ValueError as exc:
return get_error_data_result(f"`tag_feas` {exc}")
if "image_base64" in req:
image_binary, image_err = _decode_chunk_image_base64(req.get("image_base64"))
if image_err:
return get_error_data_result(message=image_err)
store_err = _store_chunk_image_or_error(dataset_id, chunk_id, image_binary)
if store_err:
return get_error_data_result(message=store_err)
d["img_id"] = f"{dataset_id}-{chunk_id}"
d["doc_type_kwd"] = "image"
embd_id = DocumentService.get_embd_id(document_id)
model_config = get_model_config_from_provider_instance(dataset_tenant_id, LLMType.EMBEDDING.value, embd_id)
embd_mdl = TenantLLMService.model_instance(model_config)
v, c = embd_mdl.encode([doc.name, req["content"] if not d["question_kwd"] else "\n".join(d["question_kwd"])])
v = 0.1 * v[0] + 0.9 * v[1]
d[f"q_{len(v)}_vec"] = v.tolist()
settings.docStoreConn.insert([d], search.index_name(dataset_tenant_id), dataset_id)
DocumentService.increment_chunk_num(doc.id, doc.kb_id, c, 1, 0)
key_mapping = {
"id": "id",
"content_with_weight": "content",
"doc_id": "document_id",
"important_kwd": "important_keywords",
"tag_kwd": "tag_kwd",
"question_kwd": "questions",
"kb_id": "dataset_id",
"create_timestamp_flt": "create_timestamp",
"create_time": "create_time",
"document_keyword": "document",
"img_id": "image_id",
}
renamed_chunk = {new_key: d[key] for key, new_key in key_mapping.items() if key in d}
_ = Chunk(**renamed_chunk)
return get_result(data={"chunk": renamed_chunk})
@manager.route("/datasets/<dataset_id>/documents/<document_id>/chunks", methods=["DELETE"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def rm_chunk(tenant_id, dataset_id, document_id):
from rag.nlp import search
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
dataset_tenant_id = _get_dataset_tenant_id(dataset_id)
if not dataset_tenant_id:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
docs = DocumentService.query(id=document_id, kb_id=dataset_id)
if not docs:
return get_error_data_result(message=f"You don't own the document {document_id}.")
req = await get_request_json()
if not req:
return get_result()
chunk_ids = req.get("chunk_ids")
if not chunk_ids:
if req.get("delete_all") is True:
doc = docs[0]
DocumentService.delete_chunk_images(doc, dataset_tenant_id)
chunk_number = settings.docStoreConn.delete(
{"doc_id": document_id, "must_not": {"exists": "compile_kwd"}},
search.index_name(dataset_tenant_id),
dataset_id,
)
if chunk_number != 0:
DocumentService.decrement_chunk_num(document_id, dataset_id, 1, chunk_number, 0)
return get_result(message=f"deleted {chunk_number} chunks")
return get_result()
unique_chunk_ids, duplicate_messages = check_duplicate_ids(chunk_ids, "chunk")
chunk_number = settings.docStoreConn.delete(
{"doc_id": document_id, "id": unique_chunk_ids, "must_not": {"exists": "compile_kwd"}},
search.index_name(dataset_tenant_id),
dataset_id,
)
if chunk_number != 0:
DocumentService.decrement_chunk_num(document_id, dataset_id, 1, chunk_number, 0)
if chunk_number != len(unique_chunk_ids):
if len(unique_chunk_ids) == 0:
return get_result(message=f"deleted {chunk_number} chunks")
return get_error_data_result(message=f"rm_chunk deleted chunks {chunk_number}, expect {len(unique_chunk_ids)}")
if duplicate_messages:
return get_result(
message=f"Partially deleted {chunk_number} chunks with {len(duplicate_messages)} errors",
data={"success_count": chunk_number, "errors": duplicate_messages},
)
return get_result(message=f"deleted {chunk_number} chunks")
@manager.route("/datasets/<dataset_id>/documents/<document_id>/chunks/<chunk_id>", methods=["PATCH"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def update_chunk(tenant_id, dataset_id, document_id, chunk_id):
from rag.app.qa import beAdoc, rmPrefix
from rag.nlp import rag_tokenizer, search
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
dataset_tenant_id = _get_dataset_tenant_id(dataset_id)
if not dataset_tenant_id:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
doc = DocumentService.query(id=document_id, kb_id=dataset_id)
if not doc:
return get_error_data_result(message=f"You don't own the document {document_id}.")
doc = doc[0]
chunk = settings.docStoreConn.get(chunk_id, search.index_name(dataset_tenant_id), [dataset_id])
if chunk is None or str(chunk.get("doc_id", chunk.get("document_id"))) != str(document_id):
return get_error_data_result(f"Can't find this chunk {chunk_id}")
req = await get_request_json()
content = req.get("content")
if content is not None:
if is_content_empty(content):
return get_error_data_result(message="`content` is required")
else:
content = chunk.get("content_with_weight", "")
d = {"id": chunk_id, "content_with_weight": content}
d["content_ltks"] = rag_tokenizer.tokenize(d["content_with_weight"])
d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"])
if "important_keywords" in req:
if not isinstance(req["important_keywords"], list):
return get_error_data_result("`important_keywords` should be a list")
d["important_kwd"] = req.get("important_keywords", [])
d["important_tks"] = rag_tokenizer.tokenize(" ".join(req["important_keywords"]))
if "questions" in req:
if not isinstance(req["questions"], list):
return get_error_data_result("`questions` should be a list")
d["question_kwd"] = [str(q).strip() for q in req.get("questions", []) if str(q).strip()]
d["question_tks"] = rag_tokenizer.tokenize("\n".join(req["questions"]))
if "available" in req:
d["available_int"] = int(req["available"])
if "positions" in req:
if not isinstance(req["positions"], list):
return get_error_data_result("`positions` should be a list")
d["position_int"] = req["positions"]
if "tag_kwd" in req:
if not isinstance(req["tag_kwd"], list):
return get_error_data_result("`tag_kwd` should be a list")
if not all(isinstance(t, str) for t in req["tag_kwd"]):
return get_error_data_result("`tag_kwd` must be a list of strings")
d["tag_kwd"] = req["tag_kwd"]
if "tag_feas" in req:
try:
d["tag_feas"] = validate_tag_features(req["tag_feas"])
except ValueError as exc:
return get_error_data_result(f"`tag_feas` {exc}")
if "image_base64" in req:
image_binary, image_err = _decode_chunk_image_base64(req.get("image_base64"))
if image_err:
return get_error_data_result(message=image_err)
store_err = _store_chunk_image_or_error(dataset_id, chunk_id, image_binary)
if store_err:
return get_error_data_result(message=store_err)
d["img_id"] = f"{dataset_id}-{chunk_id}"
d["doc_type_kwd"] = "image"
embd_id = DocumentService.get_embd_id(document_id)
model_config = get_model_config_from_provider_instance(dataset_tenant_id, LLMType.EMBEDDING.value, embd_id)
embd_mdl = TenantLLMService.model_instance(model_config)
if doc.parser_id == ParserType.QA:
arr = [t for t in re.split(r"[\n\t]", d["content_with_weight"]) if len(t) > 1]
if len(arr) != 2:
return get_error_data_result(message="Q&A must be separated by TAB/ENTER key.")
q, a = rmPrefix(arr[0]), rmPrefix(arr[1])
d = beAdoc(d, arr[0], arr[1], not any([rag_tokenizer.is_chinese(t) for t in q + a]))
v, _ = embd_mdl.encode(
[
doc.name,
d["content_with_weight"] if not d.get("question_kwd") else "\n".join(d["question_kwd"]),
]
)
v = 0.1 * v[0] + 0.9 * v[1] if doc.parser_id != ParserType.QA else v[1]
d[f"q_{len(v)}_vec"] = v.tolist()
settings.docStoreConn.update({"id": chunk_id}, d, search.index_name(dataset_tenant_id), dataset_id)
return get_result()
@manager.route("/datasets/<dataset_id>/documents/<document_id>/chunks", methods=["PATCH"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def switch_chunks(tenant_id, dataset_id, document_id):
from rag.nlp import search
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
dataset_tenant_id = _get_dataset_tenant_id(dataset_id)
if not dataset_tenant_id:
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
req = await get_request_json()
if not req.get("chunk_ids"):
return get_error_data_result(message="`chunk_ids` is required.")
if "available_int" not in req and "available" not in req:
return get_error_data_result(message="`available_int` or `available` is required.")
available_int = int(req["available_int"]) if "available_int" in req else (1 if req.get("available") else 0)
try:
def _switch_sync():
e, doc = DocumentService.get_by_id(document_id)
if not e:
return get_error_data_result(message="Document not found!")
if not doc or str(doc.kb_id) != str(dataset_id):
return get_error_data_result(message="Document not found!")
for cid in req["chunk_ids"]:
if not settings.docStoreConn.update(
{"id": cid},
{"available_int": available_int},
search.index_name(dataset_tenant_id),
doc.kb_id,
):
return get_error_data_result(message="Index updating failure")
return get_result(data=True)
return await thread_pool_exec(_switch_sync)
except Exception as e:
return server_error_response(e)