From eeb89d604e62a02922d1426a0db6334a3f6b894b Mon Sep 17 00:00:00 2001 From: Paras Sondhi Date: Fri, 24 Apr 2026 16:33:19 +0530 Subject: [PATCH] feat: route docling parsing through native chunking endpoints (#14218) Resolves #14211 **Background:** Currently, RAGFlow routes all Docling parsing through the standard `/convert/source` endpoint. For large documents, this returns massive, unchunked text that exceeds RAGFlow's internal embedding model context limits, causing pipeline failures. **Solution:** This PR updates the `_parse_pdf_remote` ingestion logic in `docling_parser.py` to prioritize `docling-serve`'s native chunking endpoints (`/v1/chunk/source` and `/v1alpha/chunk/source`). - By receiving pre-sliced chunk objects directly from Docling, RAGFlow natively bypasses token limit overflows. - Included a graceful fallback mechanism to the standard `/convert/source` endpoints to maintain backwards compatibility for users running older versions of the Docling server that return 404s on the new routes. ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- deepdoc/parser/docling_parser.py | 113 +++++++++++++++++++++++-------- 1 file changed, 85 insertions(+), 28 deletions(-) diff --git a/deepdoc/parser/docling_parser.py b/deepdoc/parser/docling_parser.py index a2ebc40025..2e7d475148 100644 --- a/deepdoc/parser/docling_parser.py +++ b/deepdoc/parser/docling_parser.py @@ -44,6 +44,7 @@ except Exception: from deepdoc.parser.utils import extract_pdf_outlines + class DoclingContentType(str, Enum): IMAGE = "image" TABLE = "table" @@ -350,6 +351,13 @@ class DoclingParser(RAGFlowPdfParser): docling_server_url: Optional[str] = None, request_timeout: Optional[int] = None, ): + """ + Parses a PDF document using a remote Docling server. + + Prioritizes native chunking endpoints (/v1/chunk/source, /v1alpha/chunk/source) + to prevent token overflow, with a graceful fallback to standard conversion + endpoints if chunking is unavailable. + """ server_url = self._effective_server_url(docling_server_url) if not server_url: raise RuntimeError("[Docling] DOCLING_SERVER_URL is not configured.") @@ -372,36 +380,48 @@ class DoclingParser(RAGFlowPdfParser): filename = Path(filepath).name or "input.pdf" b64 = base64.b64encode(pdf_bytes).decode("ascii") - v1_payload = { - "options": { - "from_formats": ["pdf"], - "to_formats": ["json", "md", "text"], - }, - "sources": [ - { - "kind": "file", - "filename": filename, - "base64_string": b64, - } - ], + + # Standard payloads + # Standard fallback payloads (no chunking) + v1_payload_standard = { + "options": {"from_formats": ["pdf"], "to_formats": ["json", "md", "text"]}, + "sources": [{"kind": "file", "filename": filename, "base64_string": b64}], } - v1alpha_payload = { - "options": { - "from_formats": ["pdf"], - "to_formats": ["json", "md", "text"], - }, - "file_sources": [ - { - "filename": filename, - "base64_string": b64, - } - ], + v1alpha_payload_standard = { + "options": {"from_formats": ["pdf"], "to_formats": ["json", "md", "text"]}, + "file_sources": [{"filename": filename, "base64_string": b64}], } + + # --- NEW: Correct API Contract for Chunking --- + chunking_opts = { + "from_formats": ["pdf"], + "to_formats": ["json", "md", "text"], + "do_chunking": True, + "chunking_options": { + "max_tokens": 512, + "overlap": 50, + "tokenizer": "sentencepiece" # Required by Docling contract + } + } + v1_payload_chunked = { + "options": chunking_opts, + "sources": [{"kind": "file", "filename": filename, "base64_string": b64}], + } + v1alpha_payload_chunked = { + "options": chunking_opts, + "file_sources": [{"filename": filename, "base64_string": b64}], + } + errors = [] response_json = None - for endpoint, payload in ( - ("/v1/convert/source", v1_payload), - ("/v1alpha/convert/source", v1alpha_payload), + is_chunked_response = False + + # Try chunked endpoints first, then fall back to standard if the server is older + for endpoint, payload, chunk_flag in ( + ("/v1/convert/source", v1_payload_chunked, True), + ("/v1alpha/convert/source", v1alpha_payload_chunked, True), + ("/v1/convert/source", v1_payload_standard, False), + ("/v1alpha/convert/source", v1alpha_payload_standard, False), ): try: resp = requests.post( @@ -411,20 +431,57 @@ class DoclingParser(RAGFlowPdfParser): ) if resp.status_code < 300: response_json = resp.json() + is_chunked_response = chunk_flag + + if chunk_flag: + self.logger.info(f"[Docling] Successfully used native chunking on: {endpoint}") + else: + self.logger.info(f"[Docling] Chunking unavailable, fell back to standard: {endpoint}") break + + # If chunking request is rejected (e.g., 422 Unprocessable Entity on older servers), + # log it and let the loop naturally fall back to the standard payload. + if chunk_flag: + self.logger.warning(f"[Docling] Server rejected chunking parameters: HTTP {resp.status_code}") + continue + errors.append(f"{endpoint}: HTTP {resp.status_code} {resp.text[:300]}") + except Exception as exc: + self.logger.error(f"[Docling] Request error on {endpoint}: {exc}") errors.append(f"{endpoint}: {exc}") if response_json is None: raise RuntimeError("[Docling] remote convert failed: " + " | ".join(errors)) + sections: list[tuple[str, ...]] = [] + tables = [] + + # --- NEW: Handle Native Chunked Response --- + if is_chunked_response: + # The chunking endpoint returns an array of chunk items + chunks = response_json if isinstance(response_json, list) else response_json.get("results", []) + for chunk_data in chunks: + if not isinstance(chunk_data, dict): + continue + # Depending on the exact docling-serve spec, the text might be nested + chunk_text = chunk_data.get("text", "") + if not chunk_text and isinstance(chunk_data.get("chunk"), dict): + chunk_text = chunk_data["chunk"].get("text", "") + + if isinstance(chunk_text, str) and chunk_text.strip(): + # Feed the pre-sliced chunks directly into RAGFlow's expected format + sections.extend(self._sections_from_remote_text(chunk_text, parse_method=parse_method)) + + if callback: + callback(0.95, f"[Docling] Native chunks received: {len(sections)}") + return sections, tables + + # --- FALLBACK: Standard RAGFlow parsing for older docling servers --- docs = self._extract_remote_document_entries(response_json) if not docs: raise RuntimeError("[Docling] remote response does not contain parsed documents.") - sections: list[tuple[str, ...]] = [] - tables = [] for doc in docs: md = doc.get("md_content") txt = doc.get("text_content")