feat: route docling parsing through native chunking endpoints (#14218)

Resolves #14211

**Background:** Currently, RAGFlow routes all Docling parsing through
the standard `/convert/source` endpoint. For large documents, this
returns massive, unchunked text that exceeds RAGFlow's internal
embedding model context limits, causing pipeline failures.

**Solution:**
This PR updates the `_parse_pdf_remote` ingestion logic in
`docling_parser.py` to prioritize `docling-serve`'s native chunking
endpoints (`/v1/chunk/source` and `/v1alpha/chunk/source`).
- By receiving pre-sliced chunk objects directly from Docling, RAGFlow
natively bypasses token limit overflows.
- Included a graceful fallback mechanism to the standard
`/convert/source` endpoints to maintain backwards compatibility for
users running older versions of the Docling server that return 404s on
the new routes.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Paras Sondhi
2026-04-24 16:33:19 +05:30
committed by GitHub
parent beb2406b86
commit eeb89d604e

View File

@@ -44,6 +44,7 @@ except Exception:
from deepdoc.parser.utils import extract_pdf_outlines
class DoclingContentType(str, Enum):
IMAGE = "image"
TABLE = "table"
@@ -350,6 +351,13 @@ class DoclingParser(RAGFlowPdfParser):
docling_server_url: Optional[str] = None,
request_timeout: Optional[int] = None,
):
"""
Parses a PDF document using a remote Docling server.
Prioritizes native chunking endpoints (/v1/chunk/source, /v1alpha/chunk/source)
to prevent token overflow, with a graceful fallback to standard conversion
endpoints if chunking is unavailable.
"""
server_url = self._effective_server_url(docling_server_url)
if not server_url:
raise RuntimeError("[Docling] DOCLING_SERVER_URL is not configured.")
@@ -372,36 +380,48 @@ class DoclingParser(RAGFlowPdfParser):
filename = Path(filepath).name or "input.pdf"
b64 = base64.b64encode(pdf_bytes).decode("ascii")
v1_payload = {
"options": {
"from_formats": ["pdf"],
"to_formats": ["json", "md", "text"],
},
"sources": [
{
"kind": "file",
"filename": filename,
"base64_string": b64,
}
],
# Standard payloads
# Standard fallback payloads (no chunking)
v1_payload_standard = {
"options": {"from_formats": ["pdf"], "to_formats": ["json", "md", "text"]},
"sources": [{"kind": "file", "filename": filename, "base64_string": b64}],
}
v1alpha_payload = {
"options": {
"from_formats": ["pdf"],
"to_formats": ["json", "md", "text"],
},
"file_sources": [
{
"filename": filename,
"base64_string": b64,
}
],
v1alpha_payload_standard = {
"options": {"from_formats": ["pdf"], "to_formats": ["json", "md", "text"]},
"file_sources": [{"filename": filename, "base64_string": b64}],
}
# --- NEW: Correct API Contract for Chunking ---
chunking_opts = {
"from_formats": ["pdf"],
"to_formats": ["json", "md", "text"],
"do_chunking": True,
"chunking_options": {
"max_tokens": 512,
"overlap": 50,
"tokenizer": "sentencepiece" # Required by Docling contract
}
}
v1_payload_chunked = {
"options": chunking_opts,
"sources": [{"kind": "file", "filename": filename, "base64_string": b64}],
}
v1alpha_payload_chunked = {
"options": chunking_opts,
"file_sources": [{"filename": filename, "base64_string": b64}],
}
errors = []
response_json = None
for endpoint, payload in (
("/v1/convert/source", v1_payload),
("/v1alpha/convert/source", v1alpha_payload),
is_chunked_response = False
# Try chunked endpoints first, then fall back to standard if the server is older
for endpoint, payload, chunk_flag in (
("/v1/convert/source", v1_payload_chunked, True),
("/v1alpha/convert/source", v1alpha_payload_chunked, True),
("/v1/convert/source", v1_payload_standard, False),
("/v1alpha/convert/source", v1alpha_payload_standard, False),
):
try:
resp = requests.post(
@@ -411,20 +431,57 @@ class DoclingParser(RAGFlowPdfParser):
)
if resp.status_code < 300:
response_json = resp.json()
is_chunked_response = chunk_flag
if chunk_flag:
self.logger.info(f"[Docling] Successfully used native chunking on: {endpoint}")
else:
self.logger.info(f"[Docling] Chunking unavailable, fell back to standard: {endpoint}")
break
# If chunking request is rejected (e.g., 422 Unprocessable Entity on older servers),
# log it and let the loop naturally fall back to the standard payload.
if chunk_flag:
self.logger.warning(f"[Docling] Server rejected chunking parameters: HTTP {resp.status_code}")
continue
errors.append(f"{endpoint}: HTTP {resp.status_code} {resp.text[:300]}")
except Exception as exc:
self.logger.error(f"[Docling] Request error on {endpoint}: {exc}")
errors.append(f"{endpoint}: {exc}")
if response_json is None:
raise RuntimeError("[Docling] remote convert failed: " + " | ".join(errors))
sections: list[tuple[str, ...]] = []
tables = []
# --- NEW: Handle Native Chunked Response ---
if is_chunked_response:
# The chunking endpoint returns an array of chunk items
chunks = response_json if isinstance(response_json, list) else response_json.get("results", [])
for chunk_data in chunks:
if not isinstance(chunk_data, dict):
continue
# Depending on the exact docling-serve spec, the text might be nested
chunk_text = chunk_data.get("text", "")
if not chunk_text and isinstance(chunk_data.get("chunk"), dict):
chunk_text = chunk_data["chunk"].get("text", "")
if isinstance(chunk_text, str) and chunk_text.strip():
# Feed the pre-sliced chunks directly into RAGFlow's expected format
sections.extend(self._sections_from_remote_text(chunk_text, parse_method=parse_method))
if callback:
callback(0.95, f"[Docling] Native chunks received: {len(sections)}")
return sections, tables
# --- FALLBACK: Standard RAGFlow parsing for older docling servers ---
docs = self._extract_remote_document_entries(response_json)
if not docs:
raise RuntimeError("[Docling] remote response does not contain parsed documents.")
sections: list[tuple[str, ...]] = []
tables = []
for doc in docs:
md = doc.get("md_content")
txt = doc.get("text_content")