mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
refactor(paddleocr): migrate from sync API to async Job API (#15967)
## Summary
Migrate PaddleOCR integration from the deprecated synchronous HTTP API
to the new asynchronous Job API (`submit → poll → fetch`), aligning with
PaddleOCR 3.6.0+ architecture.
## Changes
### Python (`deepdoc/parser/paddleocr_parser.py`)
- Replace synchronous `requests.post()` with async Job API flow (submit
→ poll → fetch)
- Authentication: `token {token}` → `Bearer {token}`
- File transfer: base64 JSON body → multipart file upload
- Polling: exponential backoff (initial 3s, ×1.5, max 15s, timeout
controlled by `request_timeout`)
- Result: fetch full JSONL from result URL, preserving `prunedResult`
with bbox info for crop functionality
- Rename `api_url` → `base_url` (backward compatible: `api_url` still
accepted as fallback)
### Python (`rag/llm/ocr_model.py`)
- Prefer `paddleocr_base_url` / `PADDLEOCR_BASE_URL`, fallback to
`paddleocr_api_url` / `PADDLEOCR_API_URL`
### Go (`internal/entity/models/paddleocr.go`)
- Add `Client-Platform: ragflow` header to submit and poll requests
- Change polling from fixed 3s to exponential backoff (initial 3s, ×1.5,
max 15s)
### Python (`common/constants.py`)
- Add `PADDLEOCR_BASE_URL` to env keys and default config
## Backward Compatibility
- Old env var `PADDLEOCR_API_URL` still works (used as fallback)
- Frontend field `paddleocr_api_url` still works (backend reads it as
fallback)
- No user-facing configuration changes required for existing setups
## Why not use the `paddleocr` SDK package directly?
RAGFlow's `_transfer_to_sections()` relies on `prunedResult` (containing
`block_bbox`, `block_label`, `parsing_res_list`) from the raw API
response for PDF crop functionality. The SDK's public `parse_document()`
API only returns `DocParsingResult` with `markdown_text`, discarding the
bbox data. Therefore we implement the async Job API flow directly via
HTTP, following the same logic as the SDK internally.
This commit is contained in:
@@ -14,10 +14,12 @@
|
||||
#
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
import time
|
||||
from dataclasses import asdict, dataclass, field, fields
|
||||
from io import BytesIO
|
||||
from os import PathLike
|
||||
@@ -38,16 +40,19 @@ except Exception:
|
||||
class RAGFlowPdfParser:
|
||||
pass
|
||||
|
||||
|
||||
from deepdoc.parser.utils import extract_pdf_outlines
|
||||
|
||||
|
||||
AlgorithmType = Literal["PaddleOCR-VL", "PP-OCRv5", "PP-StructureV3", "PaddleOCR-VL-1.5"]
|
||||
AlgorithmType = Literal["PaddleOCR-VL", "PaddleOCR-VL-1.6", "PP-OCRv5", "PP-OCRv6", "PP-StructureV3", "PaddleOCR-VL-1.5"]
|
||||
SectionTuple = tuple[str, ...]
|
||||
TableTuple = tuple[str, ...]
|
||||
ParseResult = tuple[list[SectionTuple], list[TableTuple]]
|
||||
SUPPORTED_PADDLEOCR_ALGORITHMS: tuple[AlgorithmType, ...] = (
|
||||
"PaddleOCR-VL",
|
||||
"PaddleOCR-VL-1.6",
|
||||
"PP-OCRv5",
|
||||
"PP-OCRv6",
|
||||
"PP-StructureV3",
|
||||
"PaddleOCR-VL-1.5",
|
||||
)
|
||||
@@ -116,7 +121,7 @@ class PaddleOCRVLConfig:
|
||||
class PaddleOCRConfig:
|
||||
"""Main configuration for PaddleOCR parser."""
|
||||
|
||||
api_url: str = ""
|
||||
base_url: str = "https://paddleocr.aistudio-app.com"
|
||||
access_token: Optional[str] = None
|
||||
algorithm: AlgorithmType = "PaddleOCR-VL"
|
||||
request_timeout: int = 600
|
||||
@@ -168,6 +173,9 @@ class PaddleOCRConfig:
|
||||
return cls.from_dict(kwargs)
|
||||
|
||||
|
||||
_DEFAULT_BASE_URL = "https://paddleocr.aistudio-app.com"
|
||||
|
||||
|
||||
class PaddleOCRParser(RAGFlowPdfParser):
|
||||
"""Parser for PDF documents using PaddleOCR API."""
|
||||
|
||||
@@ -216,7 +224,7 @@ class PaddleOCRParser(RAGFlowPdfParser):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
api_url: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
access_token: Optional[str] = None,
|
||||
algorithm: AlgorithmType = "PaddleOCR-VL",
|
||||
*,
|
||||
@@ -224,7 +232,7 @@ class PaddleOCRParser(RAGFlowPdfParser):
|
||||
):
|
||||
"""Initialize PaddleOCR parser."""
|
||||
self.outlines = []
|
||||
self.api_url = api_url.rstrip("/") if api_url else os.getenv("PADDLEOCR_API_URL", "")
|
||||
self.base_url = base_url.rstrip("/") if base_url else os.getenv("PADDLEOCR_BASE_URL", _DEFAULT_BASE_URL)
|
||||
self.access_token = access_token or os.getenv("PADDLEOCR_ACCESS_TOKEN")
|
||||
self.algorithm = algorithm
|
||||
self.request_timeout = request_timeout
|
||||
@@ -240,10 +248,8 @@ class PaddleOCRParser(RAGFlowPdfParser):
|
||||
# Public methods
|
||||
def check_installation(self) -> tuple[bool, str]:
|
||||
"""Check if the parser is properly installed and configured."""
|
||||
if not self.api_url:
|
||||
return False, "[PaddleOCR] API URL not configured"
|
||||
|
||||
# TODO [@Bobholamovic]: Check URL availability and token validity
|
||||
if not self.access_token:
|
||||
return False, "[PaddleOCR] Access token not configured"
|
||||
|
||||
return True, ""
|
||||
|
||||
@@ -254,7 +260,7 @@ class PaddleOCRParser(RAGFlowPdfParser):
|
||||
callback: Optional[Callable[[float, str], None]] = None,
|
||||
*,
|
||||
parse_method: str = "raw",
|
||||
api_url: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
access_token: Optional[str] = None,
|
||||
algorithm: Optional[AlgorithmType] = None,
|
||||
request_timeout: Optional[int] = None,
|
||||
@@ -267,9 +273,8 @@ class PaddleOCRParser(RAGFlowPdfParser):
|
||||
) -> ParseResult:
|
||||
"""Parse PDF document using PaddleOCR API."""
|
||||
self.outlines = extract_pdf_outlines(binary if binary is not None else filepath)
|
||||
# Create configuration - pass all kwargs to capture VL config parameters
|
||||
config_dict = {
|
||||
"api_url": api_url if api_url is not None else self.api_url,
|
||||
"base_url": base_url if base_url is not None else self.base_url,
|
||||
"access_token": access_token if access_token is not None else self.access_token,
|
||||
"algorithm": algorithm if algorithm is not None else self.algorithm,
|
||||
"request_timeout": request_timeout if request_timeout is not None else self.request_timeout,
|
||||
@@ -285,10 +290,14 @@ class PaddleOCRParser(RAGFlowPdfParser):
|
||||
if algorithm_config is not None:
|
||||
config_dict["algorithm_config"] = algorithm_config
|
||||
|
||||
# Forward any extra kwargs that match PaddleOCRConfig fields
|
||||
config_field_names = {f.name for f in fields(PaddleOCRConfig)}
|
||||
config_dict.update({k: v for k, v in kwargs.items() if k in config_field_names and v is not None})
|
||||
|
||||
cfg = PaddleOCRConfig.from_dict(config_dict)
|
||||
|
||||
if not cfg.api_url:
|
||||
raise RuntimeError("[PaddleOCR] API URL missing")
|
||||
if not cfg.base_url:
|
||||
raise RuntimeError("[PaddleOCR] Base URL missing")
|
||||
|
||||
# Prepare file data and generate page images for cropping
|
||||
data_bytes = self._prepare_file_data(filepath, binary)
|
||||
@@ -329,11 +338,8 @@ class PaddleOCRParser(RAGFlowPdfParser):
|
||||
return source_path.read_bytes()
|
||||
|
||||
def _build_payload(self, data: bytes, file_type: int, config: PaddleOCRConfig) -> dict[str, Any]:
|
||||
"""Build payload for API request."""
|
||||
payload: dict[str, Any] = {
|
||||
"file": base64.b64encode(data).decode("ascii"),
|
||||
"fileType": file_type,
|
||||
}
|
||||
"""Build optionalPayload for async Job API request."""
|
||||
payload: dict[str, Any] = {}
|
||||
|
||||
# Add common parameters
|
||||
for param_key, param_value in [
|
||||
@@ -359,44 +365,141 @@ class PaddleOCRParser(RAGFlowPdfParser):
|
||||
return payload
|
||||
|
||||
def _send_request(self, data: bytes, config: PaddleOCRConfig, callback: Optional[Callable[[float, str], None]]) -> dict[str, Any]:
|
||||
"""Send request to PaddleOCR API and parse response."""
|
||||
# Build payload
|
||||
payload = self._build_payload(data, self.file_type, config)
|
||||
"""Send request to PaddleOCR async Job API (submit → poll → fetch)."""
|
||||
optional_payload = self._build_payload(data, self.file_type, config)
|
||||
|
||||
# Prepare headers
|
||||
headers = {"Content-Type": "application/json", "Client-Platform": "ragflow"}
|
||||
headers: dict[str, str] = {"Client-Platform": "ragflow"}
|
||||
if config.access_token:
|
||||
headers["Authorization"] = f"token {config.access_token}"
|
||||
headers["Authorization"] = f"Bearer {config.access_token}"
|
||||
|
||||
self.logger.info("[PaddleOCR] invoking API")
|
||||
jobs_url = f"{config.base_url.rstrip('/')}/api/v2/ocr/jobs"
|
||||
deadline = time.monotonic() + config.request_timeout
|
||||
|
||||
def _remaining() -> float:
|
||||
r = deadline - time.monotonic()
|
||||
if r <= 0:
|
||||
raise RuntimeError(f"[PaddleOCR] timed out after {config.request_timeout}s")
|
||||
return r
|
||||
|
||||
self.logger.info("[PaddleOCR] submitting job")
|
||||
if callback:
|
||||
callback(0.1, "[PaddleOCR] submitting request")
|
||||
|
||||
# Send request
|
||||
# Step 1: Submit job with file upload
|
||||
tmp_file = None
|
||||
try:
|
||||
resp = requests.post(config.api_url, json=payload, headers=headers, timeout=self.request_timeout)
|
||||
resp.raise_for_status()
|
||||
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf")
|
||||
tmp_file.write(data)
|
||||
tmp_file.close()
|
||||
|
||||
form_data = {
|
||||
"model": config.algorithm,
|
||||
"optionalPayload": json.dumps(optional_payload),
|
||||
}
|
||||
with open(tmp_file.name, "rb") as f:
|
||||
resp = requests.post(
|
||||
jobs_url,
|
||||
data=form_data,
|
||||
files={"file": ("document.pdf", f)},
|
||||
headers=headers,
|
||||
timeout=_remaining(),
|
||||
)
|
||||
except Exception as exc:
|
||||
if callback:
|
||||
callback(-1, f"[PaddleOCR] request failed: {exc}")
|
||||
raise RuntimeError(f"[PaddleOCR] request failed: {exc}")
|
||||
callback(-1, f"[PaddleOCR] submit failed: {exc}")
|
||||
raise RuntimeError(f"[PaddleOCR] submit failed: {exc}")
|
||||
finally:
|
||||
if tmp_file and os.path.exists(tmp_file.name):
|
||||
os.unlink(tmp_file.name)
|
||||
|
||||
if resp.status_code != 200:
|
||||
raise RuntimeError(f"[PaddleOCR] submit failed: HTTP {resp.status_code} {resp.text}")
|
||||
|
||||
# Parse response
|
||||
try:
|
||||
response_data = resp.json()
|
||||
except Exception as exc:
|
||||
raise RuntimeError(f"[PaddleOCR] response is not JSON: {exc}") from exc
|
||||
submit_data = resp.json()
|
||||
except ValueError as exc:
|
||||
raise RuntimeError(f"[PaddleOCR] submit response is not JSON: {exc}")
|
||||
job_id = submit_data.get("data", {}).get("jobId") or submit_data.get("jobId")
|
||||
if not job_id:
|
||||
raise RuntimeError(f"[PaddleOCR] job ID not found in response: {submit_data}")
|
||||
|
||||
if callback:
|
||||
callback(0.8, "[PaddleOCR] response received")
|
||||
callback(0.2, f"[PaddleOCR] job submitted: {job_id}")
|
||||
|
||||
# Validate response format
|
||||
if response_data.get("errorCode") != 0 or not isinstance(response_data.get("result"), dict):
|
||||
if callback:
|
||||
callback(-1, "[PaddleOCR] invalid response format")
|
||||
raise RuntimeError("[PaddleOCR] invalid response format")
|
||||
# Step 2: Poll until done (exponential backoff)
|
||||
poll_url = f"{jobs_url}/{job_id}"
|
||||
interval = 3.0
|
||||
multiplier = 1.5
|
||||
max_interval = 15.0
|
||||
self.logger.info(f"[PaddleOCR] polling job {job_id}")
|
||||
|
||||
return response_data["result"]
|
||||
while True:
|
||||
if time.monotonic() >= deadline:
|
||||
raise RuntimeError(f"[PaddleOCR] job {job_id} timed out after {config.request_timeout}s")
|
||||
|
||||
try:
|
||||
poll_resp = requests.get(poll_url, headers=headers, timeout=_remaining())
|
||||
except Exception as exc:
|
||||
raise RuntimeError(f"[PaddleOCR] poll failed: {exc}")
|
||||
|
||||
if poll_resp.status_code != 200:
|
||||
raise RuntimeError(f"[PaddleOCR] poll failed: HTTP {poll_resp.status_code} {poll_resp.text[:200]}")
|
||||
|
||||
try:
|
||||
poll_data = poll_resp.json()
|
||||
except ValueError as exc:
|
||||
raise RuntimeError(f"[PaddleOCR] poll response is not JSON: {exc}")
|
||||
state = poll_data.get("data", {}).get("state") or poll_data.get("state")
|
||||
|
||||
if state == "done":
|
||||
self.logger.info(f"[PaddleOCR] job {job_id} done")
|
||||
if callback:
|
||||
callback(0.7, "[PaddleOCR] job done, fetching result")
|
||||
break
|
||||
elif state == "failed":
|
||||
error_msg = poll_data.get("data", {}).get("errorMsg", "Unknown error")
|
||||
self.logger.error(f"[PaddleOCR] job {job_id} failed: {error_msg}")
|
||||
raise RuntimeError(f"[PaddleOCR] job failed: {error_msg}")
|
||||
|
||||
sleep_time = min(interval, max(0, deadline - time.monotonic()))
|
||||
time.sleep(sleep_time)
|
||||
interval = min(interval * multiplier, max_interval)
|
||||
|
||||
# Step 3: Fetch result
|
||||
result_data = poll_data.get("data", {})
|
||||
result_json_url = result_data.get("resultJsonUrl") or (result_data.get("resultUrl") or {}).get("jsonUrl")
|
||||
if not result_json_url:
|
||||
raise RuntimeError(f"[PaddleOCR] result URL not found: {poll_data}")
|
||||
|
||||
try:
|
||||
result_resp = requests.get(result_json_url, timeout=_remaining())
|
||||
result_resp.raise_for_status()
|
||||
except Exception as exc:
|
||||
raise RuntimeError(f"[PaddleOCR] failed to fetch result: {exc}")
|
||||
|
||||
# Parse JSONL result
|
||||
jsonl_lines = result_resp.text.strip().split("\n")
|
||||
jsonl_data = []
|
||||
for line in jsonl_lines:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
jsonl_data.append(json.loads(line))
|
||||
except ValueError as exc:
|
||||
raise RuntimeError(f"[PaddleOCR] result JSONL parse error: {exc}")
|
||||
|
||||
if callback:
|
||||
callback(0.8, "[PaddleOCR] result received")
|
||||
|
||||
# Extract raw result (preserving prunedResult with bbox info)
|
||||
combined_result: dict[str, Any] = {"layoutParsingResults": []}
|
||||
for line_obj in jsonl_data:
|
||||
result = line_obj.get("result", {})
|
||||
layout_results = result.get("layoutParsingResults", [])
|
||||
combined_result["layoutParsingResults"].extend(layout_results)
|
||||
|
||||
return combined_result
|
||||
|
||||
def _transfer_to_sections(self, result: dict[str, Any], algorithm: AlgorithmType, parse_method: str) -> list[SectionTuple]:
|
||||
"""Convert API response to section tuples."""
|
||||
@@ -607,6 +710,9 @@ class PaddleOCRParser(RAGFlowPdfParser):
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
parser = PaddleOCRParser(api_url=os.getenv("PADDLEOCR_API_URL", ""), algorithm=os.getenv("PADDLEOCR_ALGORITHM", "PaddleOCR-VL"))
|
||||
parser = PaddleOCRParser(
|
||||
base_url=os.getenv("PADDLEOCR_BASE_URL") or None,
|
||||
algorithm=os.getenv("PADDLEOCR_ALGORITHM", "PaddleOCR-VL"),
|
||||
)
|
||||
ok, reason = parser.check_installation()
|
||||
print("PaddleOCR available:", ok, reason)
|
||||
|
||||
Reference in New Issue
Block a user