From 62f94cd59b505b56df724faba4d4a2a8ce79679a Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Thu, 2 Jul 2026 23:22:07 +0800 Subject: [PATCH] Feat: Add knowledge compilation workflows (#16515) ## Summary - Add knowledge compilation template APIs, services, and builtin template seed data - Add advanced knowledge compile structure/artifact/RAPTOR workflow support - Update parsing, dataset/document APIs, and supporting services for compilation workflows --- api/apps/restful_apis/chunk_api.py | 353 +- .../restful_apis/compilation_template_api.py | 54 + .../compilation_template_group_api.py | 172 + api/apps/restful_apis/dataset_api.py | 259 +- api/apps/restful_apis/document_api.py | 67 +- api/apps/restful_apis/file_commit_api.py | 245 +- .../utils/compilation_template_validation.py | 79 + api/apps/services/dataset_api_service.py | 1133 +++++- api/apps/services/document_api_service.py | 14 +- api/db/__init__.py | 53 +- api/db/db_models.py | 195 +- api/db/init_data.py | 24 +- .../compilation_templates/artifacts.yaml | 136 + .../compilation_templates/empty.yaml | 17 + .../knowledge_graph.yaml | 74 + .../compilation_templates/mind_map.yaml | 69 + .../compilation_templates/page_index.yaml | 33 + .../compilation_templates/timeline.yaml | 48 + .../init_data/compilation_templates/tree.yaml | 31 + .../compilation_template_group_service.py | 394 ++ .../services/compilation_template_service.py | 231 ++ api/db/services/dialog_service.py | 3 +- api/db/services/document_service.py | 111 +- api/db/services/file_commit_service.py | 523 ++- api/db/services/knowledgebase_service.py | 110 +- .../pipeline_operation_log_service.py | 16 +- api/db/services/task_service.py | 188 +- api/utils/validation_utils.py | 22 +- common/constants.py | 4 +- conf/infinity_mapping.json | 34 +- deepdoc/parser/pdf_parser.py | 12 +- rag/advanced_rag/knowlege_compile/__init__.py | 43 + rag/advanced_rag/knowlege_compile/_common.py | 913 +++++ .../knowlege_compile/dataset_nav.py | 437 ++ .../knowlege_compile}/mind_map_extractor.py | 47 +- .../knowlege_compile}/raptor.py | 335 +- .../knowlege_compile/structure.py | 1637 ++++++++ rag/advanced_rag/knowlege_compile/wiki.py | 3576 +++++++++++++++++ rag/flow/extractor/extractor.py | 57 +- rag/nlp/search.py | 229 +- rag/svr/task_executor.py | 25 +- .../chunk_post_processor.py | 901 ++++- .../dataset_skill_generator.py | 588 +++ .../dataset_wiki_generator.py | 805 ++++ .../task_executor_refactor/raptor_service.py | 537 ++- .../task_executor_refactor/raptor_utils.py | 27 +- .../task_executor_refactor/task_handler.py | 517 ++- rag/utils/es_conn.py | 138 +- rag/utils/opensearch_conn.py | 187 +- rag/utils/redis_conn.py | 8 + .../test_file_commit_routes_unit.py | 12 +- .../rag/graphrag/test_checkpoint_resume.py | 2 +- .../test_chunk_post_processor.py | 438 -- .../test_chunk_service.py | 318 -- .../test_task_handler.py | 316 -- .../test_task_handler_integration.py | 856 ---- .../rag/test_raptor_psi_tree_builder.py | 28 +- 57 files changed, 14587 insertions(+), 3094 deletions(-) create mode 100644 api/apps/restful_apis/compilation_template_api.py create mode 100644 api/apps/restful_apis/compilation_template_group_api.py create mode 100644 api/apps/restful_apis/utils/compilation_template_validation.py create mode 100644 api/db/init_data/compilation_templates/artifacts.yaml create mode 100644 api/db/init_data/compilation_templates/empty.yaml create mode 100644 api/db/init_data/compilation_templates/knowledge_graph.yaml create mode 100644 api/db/init_data/compilation_templates/mind_map.yaml create mode 100644 api/db/init_data/compilation_templates/page_index.yaml create mode 100644 api/db/init_data/compilation_templates/timeline.yaml create mode 100644 api/db/init_data/compilation_templates/tree.yaml create mode 100644 api/db/services/compilation_template_group_service.py create mode 100644 api/db/services/compilation_template_service.py create mode 100644 rag/advanced_rag/knowlege_compile/__init__.py create mode 100644 rag/advanced_rag/knowlege_compile/_common.py create mode 100644 rag/advanced_rag/knowlege_compile/dataset_nav.py rename rag/{graphrag/general => advanced_rag/knowlege_compile}/mind_map_extractor.py (79%) rename rag/{ => advanced_rag/knowlege_compile}/raptor.py (68%) create mode 100644 rag/advanced_rag/knowlege_compile/structure.py create mode 100644 rag/advanced_rag/knowlege_compile/wiki.py create mode 100644 rag/svr/task_executor_refactor/dataset_skill_generator.py create mode 100644 rag/svr/task_executor_refactor/dataset_wiki_generator.py delete mode 100644 test/unit_test/rag/svr/task_executor_refactor/test_chunk_post_processor.py delete mode 100644 test/unit_test/rag/svr/task_executor_refactor/test_chunk_service.py delete mode 100644 test/unit_test/rag/svr/task_executor_refactor/test_task_handler.py delete mode 100644 test/unit_test/rag/svr/task_executor_refactor/test_task_handler_integration.py diff --git a/api/apps/restful_apis/chunk_api.py b/api/apps/restful_apis/chunk_api.py index 8f16ea28f1..b91519db0c 100644 --- a/api/apps/restful_apis/chunk_api.py +++ b/api/apps/restful_apis/chunk_api.py @@ -16,6 +16,7 @@ import base64 import binascii import datetime +import json import logging import re @@ -54,6 +55,7 @@ from api.utils.reference_metadata_utils import ( ) from common import settings from common.constants import LLMType, ParserType, RetCode, TaskStatus +from common.doc_store.doc_store_base import OrderByExpr from common.metadata_utils import convert_conditions, meta_filter from common.misc_utils import thread_pool_exec from common.string_utils import is_content_empty, remove_redundant_spaces @@ -135,6 +137,19 @@ def _map_doc(doc): return renamed_doc +def _get_query_id_list(args, name: str) -> list[str]: + values = args.getlist(name) if hasattr(args, "getlist") else [args.get(name)] + ids: list[str] = [] + seen: set[str] = set() + for value in values: + for item in str(value or "").split(","): + item = item.strip() + if item and item not in seen: + ids.append(item) + seen.add(item) + return ids + + def _strip_chunk_runtime_fields(chunk): for name in [name for name in chunk.keys() if re.search(r"(_vec$|_sm_|_tks|_ltks)", name)]: del chunk[name] @@ -148,6 +163,15 @@ def _get_dataset_tenant_id(dataset_id): return kb.tenant_id +def _compilation_template_kind(kind) -> str: + if not isinstance(kind, str): + return "" + normalized = kind.strip().lower().replace("-", "_") + if normalized in {"pageindex", "page_index", "knowledge_graph"}: + return "timeline" + return normalized + + def _resolve_reference_metadata(req: dict, search_config: dict | None = None): return resolve_reference_metadata_preferences(req, search_config) @@ -169,7 +193,9 @@ async def parse(tenant_id, dataset_id): if not e: return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") if kb.pipeline_id: - return get_error_data_result(message="Datasets configured with an ingestion pipeline cannot be parsed with `/datasets/{dataset_id}/chunks`. Use `/documents/ingest` instead.", code=RetCode.ARGUMENT_ERROR) + return get_error_data_result( + message="Datasets configured with an ingestion pipeline cannot be parsed with `/datasets/{dataset_id}/chunks`. Use `/documents/ingest` instead.", code=RetCode.ARGUMENT_ERROR + ) req = await get_request_json() if not req.get("document_ids"): return get_error_data_result("`document_ids` is required") @@ -363,9 +389,19 @@ async def retrieval_test(tenant_id): question += await keyword_extraction(LLMBundle(kb.tenant_id, chat_model_config), question) ranks = await settings.retriever.retrieval( - question, embd_mdl, tenant_ids, kb_ids, page, size, similarity_threshold, - vector_similarity_weight, top, doc_ids, rerank_mdl=rerank_mdl, - highlight=highlight, rank_feature=label_question(question, kbs), + question, + embd_mdl, + tenant_ids, + kb_ids, + page, + size, + similarity_threshold, + vector_similarity_weight, + top, + doc_ids, + rerank_mdl=rerank_mdl, + highlight=highlight, + rank_feature=label_question(question, kbs), ) if toc_enhance: chat_model_config = get_tenant_default_model_by_type(kb.tenant_id, LLMType.CHAT) @@ -421,13 +457,17 @@ async def list_chunks(tenant_id, dataset_id, document_id): page = int(req.get("page", 1)) size = validate_rest_api_page_size(int(req.get("page_size", 30))) question = req.get("keywords", "") + chunk_ids = _get_query_id_list(req, "chunk_ids") query = { "doc_ids": [document_id], "page": page, "size": size, "question": question, "sort": True, + "must_not": {"exists": "compile_kwd"}, } + if chunk_ids: + query["id"] = chunk_ids if "available" in req: query["available_int"] = 1 if req["available"] == "true" else 0 @@ -438,6 +478,8 @@ async def list_chunks(tenant_id, dataset_id, document_id): return get_result(message=f"Chunk not found: {dataset_id}/{req.get('id')}", code=RetCode.DATA_ERROR) if str(chunk.get("doc_id", chunk.get("document_id"))) != str(document_id): return get_result(message=f"Chunk not found: {dataset_id}/{req.get('id')}", code=RetCode.DATA_ERROR) + if chunk.get("compile_kwd"): + return get_result(message=f"Chunk not found: {dataset_id}/{req.get('id')}", code=RetCode.DATA_ERROR) _strip_chunk_runtime_fields(chunk) res["total"] = 1 final_chunk = { @@ -468,11 +510,7 @@ async def list_chunks(tenant_id, dataset_id, document_id): for chunk_id in sres.ids: d = { "id": chunk_id, - "content": ( - remove_redundant_spaces(sres.highlight[chunk_id]) - if question and chunk_id in sres.highlight - else sres.field[chunk_id].get("content_with_weight", "") - ), + "content": (remove_redundant_spaces(sres.highlight[chunk_id]) if question and chunk_id in sres.highlight else sres.field[chunk_id].get("content_with_weight", "")), "document_id": sres.field[chunk_id]["doc_id"], "docnm_kwd": sres.field[chunk_id]["docnm_kwd"], "important_keywords": sres.field[chunk_id].get("important_kwd", []), @@ -506,6 +544,8 @@ async def get_chunk(tenant_id, dataset_id, document_id, chunk_id): chunk = settings.docStoreConn.get(chunk_id, search.index_name(dataset_tenant_id), [dataset_id]) if chunk is None or str(chunk.get("doc_id", chunk.get("document_id"))) != str(document_id): return get_result(data=False, message="Chunk not found!", code=RetCode.DATA_ERROR) + if chunk.get("compile_kwd"): + return get_result(data=False, message="Chunk not found!", code=RetCode.DATA_ERROR) return get_result(data=_strip_chunk_runtime_fields(chunk)) except Exception as e: if str(e).find("NotFoundError") >= 0: @@ -513,6 +553,292 @@ async def get_chunk(tenant_id, dataset_id, document_id, chunk_id): return server_error_response(e) +@manager.route("/datasets//documents//structure/graph", methods=["GET"]) # noqa: F821 +@login_required +@add_tenant_id_to_kwargs +async def get_document_structure_graph(tenant_id, dataset_id, document_id): + """Return per-template structure graphs for a document. + + Response shape:: + + { + "templates": [ + { + "template_id": " | 'legacy:'", + "template_name": "", + "kind": "list | set | hypergraph | timeline | page_index | …", + "entities": [...], + "relations": [...] + }, + ... + ] + } + + Rows that pre-date the ``compilation_template_ids`` stamp are surfaced + under a synthetic ``legacy:`` bucket so an in-flight + migration doesn't drop their data on the floor. Empty templates + (zero entities AND zero relations) are filtered out. + """ + from rag.nlp import search + from api.db.services.compilation_template_group_service import CompilationTemplateGroupService + + if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): + return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") + dataset_tenant_id = _get_dataset_tenant_id(dataset_id) + if not dataset_tenant_id: + return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") + docs = DocumentService.query(id=document_id, kb_id=dataset_id) + if not docs: + return get_error_data_result(message=f"You don't own the document {document_id}.") + + # Resolve the doc's configured template group → child template ids + # so we can render tabs in the order the user picked them. + # Artifacts-kind templates render on the dataset Artifact tab, not + # here, so they're filtered out. + parser_config = docs[0].parser_config or {} + + def _group_ids(raw) -> list[str]: + if isinstance(raw, str): + raw = [raw] + if not isinstance(raw, list): + return [] + ids: list[str] = [] + seen: set[str] = set() + for gid in raw: + if not isinstance(gid, str): + continue + gid = gid.strip() + if gid and gid not in seen: + seen.add(gid) + ids.append(gid) + return ids + + group_ids: list[str] = [] + if isinstance(parser_config, dict): + if "compilation_template_group_id" in parser_config: + group_ids = _group_ids(parser_config.get("compilation_template_group_id")) + elif isinstance(parser_config.get("ext"), dict): + group_ids = _group_ids(parser_config["ext"].get("compilation_template_group_id")) + + configured_ids: list[str] = [] + seen_configured_ids: set[str] = set() + template_meta: dict[str, dict] = {} + template_meta_by_kind: dict[str, list[dict]] = {} + for group_id in group_ids: + group = CompilationTemplateGroupService.get_saved(group_id, tenant_id) + if not group: + continue + for template in group.get("templates") or []: + if not isinstance(template, dict): + continue + template_id = str(template.get("id") or "").strip() + if not template_id or template_id in seen_configured_ids: + continue + config = template.get("config") if isinstance(template.get("config"), dict) else {} + raw_kind = (config.get("kind") if isinstance(config, dict) else "") or template.get("kind") or "" + kind_norm = _compilation_template_kind(raw_kind) + if kind_norm == "artifacts": + continue + seen_configured_ids.add(template_id) + configured_ids.append(template_id) + meta = { + "template_id": template_id, + "template_name": template.get("name") or template_id, + "kind": raw_kind or kind_norm, + "kind_norm": kind_norm, + } + template_meta[template_id] = meta + template_meta_by_kind.setdefault(kind_norm, []).append(meta) + + # Load every graph row for this doc in one shot. Each row corresponds + # to one (compile_kwd, template_id) tuple — written by + # ``_struct_upsert_graph_json``. + index_name = search.index_name(dataset_tenant_id) + fields = [ + "content_with_weight", + "compile_kwd", + "compilation_template_ids", + "compilation_template_kind_kwd", + ] + try: + res = await thread_pool_exec( + settings.docStoreConn.search, + fields, + [], + {"doc_id": [document_id], "knowledge_graph_kwd": ["graph"]}, + [], + OrderByExpr(), + 0, + 1000, + index_name, + [dataset_id], + ) + rows = settings.docStoreConn.get_fields(res, fields) + + # The RAPTOR graph row is identified by ``compile_kwd`` + # alone — it intentionally doesn't carry ``knowledge_graph_kwd`` + # (which belongs to the KG feature). Query it separately and + # union into the same bucket map below. + res_raptor = await thread_pool_exec( + settings.docStoreConn.search, + fields, + [], + {"doc_id": [document_id], "compile_kwd": ["raptor_graph"]}, + [], + OrderByExpr(), + 0, + 16, + index_name, + [dataset_id], + ) + raptor_rows = settings.docStoreConn.get_fields(res_raptor, fields) + except Exception as e: + return server_error_response(e) + + # Merge the two field-maps so the grouping loop below treats them + # identically. Raptor rows clobber by id, which is fine — both + # sources produce stable per-row ids. + if raptor_rows: + rows = dict(rows or {}) + rows.update(raptor_rows) + + def _row_template_id(row: dict) -> str | None: + raw = row.get("compilation_template_ids") + if isinstance(raw, list): + for v in raw: + if isinstance(v, str) and v.strip(): + return v.strip() + if isinstance(raw, str) and raw.strip(): + return raw.strip() + return None + + # Group: template_id → {entities, relations, kind} + grouped: dict[str, dict] = {} + for row in (rows or {}).values(): + graph = {} + try: + graph = json.loads(row.get("content_with_weight") or "{}") + except Exception: + continue + if not isinstance(graph, dict): + continue + entities = graph.get("entities") or [] + relations = graph.get("relations") or [] + if not entities and not relations: + continue + + tid = _row_template_id(row) + compile_kwd_val = row.get("compile_kwd") or "" + kind_val = row.get("compilation_template_kind_kwd") or compile_kwd_val + + # The RAPTOR graph row has no ``compilation_template_ids`` (it + # isn't derived from a user-authored template). Treat it as its + # own first-class bucket, not a legacy fallback. + is_raptor = compile_kwd_val == "raptor_graph" + + if tid: + bucket_id = tid + row_kind_norm = _compilation_template_kind(kind_val) + meta = template_meta.get(bucket_id) + if not meta: + kind_matches = template_meta_by_kind.get(row_kind_norm) or [] + if len(kind_matches) == 1: + meta = kind_matches[0] + bucket_name = (meta or {}).get("template_name") or bucket_id + bucket_kind = (meta or {}).get("kind") or kind_val + elif is_raptor: + bucket_id = "raptor" + bucket_name = "RAPTOR Summary" + bucket_kind = "raptor" + else: + # Legacy row: synthesize a stable id keyed by compile_kwd so + # multiple legacy kinds (e.g. ``list`` + ``hypergraph``) on + # the same doc surface as separate tabs. + bucket_id = f"legacy:{compile_kwd_val}" + bucket_name = f"Legacy ({compile_kwd_val})" + bucket_kind = kind_val + + if bucket_id not in grouped: + grouped[bucket_id] = { + "template_id": bucket_id, + "template_name": bucket_name, + "kind": bucket_kind, + "entities": [], + "relations": [], + } + grouped[bucket_id]["entities"].extend(entities) + grouped[bucket_id]["relations"].extend(relations) + + # Order: configured templates first (in the user's chosen order), + # then any legacy buckets after. + ordered_ids: list[str] = [] + for tid in configured_ids: + if tid in grouped and tid not in ordered_ids: + ordered_ids.append(tid) + for bucket_id in grouped.keys(): + if bucket_id not in ordered_ids: + ordered_ids.append(bucket_id) + + templates_out = [grouped[bid] for bid in ordered_ids if grouped[bid]["entities"] or grouped[bid]["relations"]] + return get_result(data={"templates": templates_out}) + + +@manager.route("/datasets//documents//structure/graph", methods=["DELETE"]) # noqa: F821 +@login_required +@add_tenant_id_to_kwargs +async def delete_document_structure_graph(tenant_id, dataset_id, document_id): + """Delete one structure-graph tab for a document. + + Request body:: + + {"template_id": "