mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-01 08:15:44 +08:00
### What problem does this PR solve? Force image parser runtime output format to JSON so downstream chunking reads OCR results from the JSON output and image parser chunks can be displayed. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) Co-authored-by: Wang Qi <wangq8@outlook.com>
1339 lines
55 KiB
Python
1339 lines
55 KiB
Python
#
|
|
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import asyncio
|
|
import io
|
|
import json
|
|
import os
|
|
import random
|
|
import re
|
|
from functools import partial
|
|
|
|
from litellm import logging
|
|
import numpy as np
|
|
from PIL import Image
|
|
|
|
from api.db.services.file2document_service import File2DocumentService
|
|
from api.db.services.file_service import FileService
|
|
from api.db.services.llm_service import LLMBundle
|
|
from api.db.joint_services.tenant_model_service import get_tenant_default_model_by_type, get_model_config_from_provider_instance
|
|
from common import settings
|
|
from common.constants import LLMType
|
|
from common.misc_utils import get_uuid, thread_pool_exec
|
|
from deepdoc.parser import ExcelParser, HtmlParser, TxtParser
|
|
from deepdoc.parser.docling_parser import DoclingParser
|
|
from deepdoc.parser.pdf_parser import PlainParser, RAGFlowPdfParser, VisionParser
|
|
from deepdoc.parser.tcadp_parser import TCADPParser
|
|
from rag.app.naive import Docx
|
|
from rag.flow.base import ProcessBase, ProcessParamBase
|
|
from rag.flow.parser.pdf_chunk_metadata import (
|
|
extract_pdf_positions,
|
|
normalize_pdf_items_metadata,
|
|
reorder_multi_column_bboxes,
|
|
)
|
|
from rag.flow.parser.schema import ParserFromUpstream
|
|
from rag.flow.parser.utils import (
|
|
enhance_media_sections_with_vision,
|
|
extract_word_outlines,
|
|
extract_docx_header_footer_texts,
|
|
remove_header_footer_docx_sections,
|
|
remove_header_footer_html_blob,
|
|
remove_toc,
|
|
remove_toc_pdf,
|
|
remove_toc_word,
|
|
)
|
|
from rag.llm.cv_model import Base as VLM
|
|
from rag.utils.base64_image import image2id
|
|
|
|
|
|
class ParserParam(ProcessParamBase):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.allowed_output_format = {
|
|
"pdf": [
|
|
"json",
|
|
"markdown",
|
|
],
|
|
"spreadsheet": [
|
|
"json",
|
|
"markdown",
|
|
"html",
|
|
],
|
|
"doc": [
|
|
"json",
|
|
"markdown",
|
|
],
|
|
"docx": [
|
|
"json",
|
|
"markdown",
|
|
],
|
|
"slides": [
|
|
"json",
|
|
],
|
|
"image": [
|
|
"json",
|
|
],
|
|
"email": [
|
|
"text",
|
|
"json",
|
|
],
|
|
"markdown": [
|
|
"text",
|
|
"json",
|
|
],
|
|
"text&code": [
|
|
"text",
|
|
"json",
|
|
],
|
|
"html": [
|
|
"text",
|
|
"json",
|
|
],
|
|
"audio": [
|
|
"json",
|
|
],
|
|
"video": [],
|
|
"epub": [
|
|
"text",
|
|
"json",
|
|
],
|
|
}
|
|
|
|
self.setups = {
|
|
"pdf": {
|
|
"parse_method": "deepdoc", # deepdoc/plain_text/tcadp_parser/vlm
|
|
"lang": "Chinese",
|
|
"flatten_media_to_text": False,
|
|
"remove_toc": False,
|
|
"remove_header_footer": False,
|
|
"suffix": [
|
|
"pdf",
|
|
],
|
|
"output_format": "json",
|
|
},
|
|
"spreadsheet": {
|
|
"parse_method": "deepdoc", # deepdoc/tcadp_parser
|
|
"flatten_media_to_text": False,
|
|
"output_format": "html",
|
|
"suffix": [
|
|
"xls",
|
|
"xlsx",
|
|
"csv",
|
|
],
|
|
},
|
|
"doc": {
|
|
"remove_toc": False,
|
|
"remove_header_footer": False,
|
|
"suffix": [
|
|
"doc",
|
|
],
|
|
"output_format": "json",
|
|
},
|
|
"docx": {
|
|
"flatten_media_to_text": False,
|
|
"remove_toc": False,
|
|
"remove_header_footer": False,
|
|
"suffix": [
|
|
"docx",
|
|
],
|
|
"output_format": "json",
|
|
},
|
|
"markdown": {
|
|
"flatten_media_to_text": False,
|
|
"suffix": ["md", "markdown", "mdx"],
|
|
"remove_toc": False,
|
|
"output_format": "json",
|
|
},
|
|
"text&code": {
|
|
"suffix": [
|
|
"txt",
|
|
"py",
|
|
"js",
|
|
"java",
|
|
"c",
|
|
"cpp",
|
|
"h",
|
|
"php",
|
|
"go",
|
|
"ts",
|
|
"sh",
|
|
"cs",
|
|
"kt",
|
|
"sql",
|
|
],
|
|
"output_format": "json",
|
|
},
|
|
"html": {
|
|
"suffix": ["htm", "html"],
|
|
"remove_toc": False,
|
|
"remove_header_footer": False,
|
|
"output_format": "json",
|
|
},
|
|
"slides": {
|
|
"parse_method": "deepdoc", # deepdoc/tcadp_parser
|
|
"suffix": [
|
|
"pptx",
|
|
"ppt",
|
|
],
|
|
"output_format": "json",
|
|
},
|
|
"image": {
|
|
"parse_method": "ocr",
|
|
"llm_id": "",
|
|
"lang": "Chinese",
|
|
"system_prompt": "",
|
|
"suffix": ["jpg", "jpeg", "png", "gif"],
|
|
"output_format": "json",
|
|
},
|
|
"email": {
|
|
"suffix": [
|
|
"eml",
|
|
"msg",
|
|
],
|
|
"fields": ["from", "to", "cc", "bcc", "date", "subject", "body", "attachments", "metadata"],
|
|
"output_format": "json",
|
|
},
|
|
"audio": {
|
|
"suffix": [
|
|
"da",
|
|
"wave",
|
|
"wav",
|
|
"mp3",
|
|
"aac",
|
|
"flac",
|
|
"ogg",
|
|
"aiff",
|
|
"au",
|
|
"midi",
|
|
"wma",
|
|
"realaudio",
|
|
"vqf",
|
|
"oggvorbis",
|
|
"ape",
|
|
],
|
|
"output_format": "text",
|
|
},
|
|
"video": {
|
|
"suffix": [
|
|
"mp4",
|
|
"avi",
|
|
"mkv",
|
|
],
|
|
"output_format": "text",
|
|
"prompt": "",
|
|
},
|
|
"epub": {
|
|
"suffix": [
|
|
"epub",
|
|
],
|
|
"output_format": "json",
|
|
},
|
|
}
|
|
|
|
def check(self):
|
|
pdf_config = self.setups.get("pdf", {})
|
|
if pdf_config:
|
|
pdf_parse_method = pdf_config.get("parse_method", "")
|
|
self.check_empty(pdf_parse_method, "Parse method abnormal.")
|
|
|
|
if pdf_parse_method.lower() not in ["deepdoc", "plain_text", "mineru", "docling", "opendataloader", "tcadp parser", "paddleocr"]:
|
|
self.check_empty(pdf_config.get("lang", ""), "PDF VLM language")
|
|
|
|
pdf_output_format = pdf_config.get("output_format", "")
|
|
self.check_valid_value(pdf_output_format, "PDF output format abnormal.", self.allowed_output_format["pdf"])
|
|
|
|
spreadsheet_config = self.setups.get("spreadsheet", "")
|
|
if spreadsheet_config:
|
|
spreadsheet_output_format = spreadsheet_config.get("output_format", "")
|
|
self.check_valid_value(spreadsheet_output_format, "Spreadsheet output format abnormal.", self.allowed_output_format["spreadsheet"])
|
|
|
|
doc_config = self.setups.get("doc", "")
|
|
if doc_config:
|
|
doc_output_format = doc_config.get("output_format", "")
|
|
self.check_valid_value(doc_output_format, "DOC output format abnormal.", self.allowed_output_format["doc"])
|
|
|
|
docx_config = self.setups.get("docx", "")
|
|
if docx_config:
|
|
docx_output_format = docx_config.get("output_format", "")
|
|
self.check_valid_value(docx_output_format, "DOCX output format abnormal.", self.allowed_output_format["docx"])
|
|
|
|
slides_config = self.setups.get("slides", "")
|
|
if slides_config:
|
|
slides_output_format = slides_config.get("output_format", "")
|
|
self.check_valid_value(slides_output_format, "Slides output format abnormal.", self.allowed_output_format["slides"])
|
|
|
|
image_config = self.setups.get("image", "")
|
|
if image_config:
|
|
image_parse_method = image_config.get("parse_method", "")
|
|
if image_parse_method not in ["ocr"]:
|
|
self.check_empty(image_config.get("lang", ""), "Image VLM language")
|
|
|
|
text_config = self.setups.get("markdown", "")
|
|
if text_config:
|
|
text_output_format = text_config.get("output_format", "")
|
|
self.check_valid_value(text_output_format, "Markdown output format abnormal.", self.allowed_output_format["markdown"])
|
|
|
|
code_config = self.setups.get("text&code", "")
|
|
if code_config:
|
|
code_output_format = code_config.get("output_format", "")
|
|
self.check_valid_value(code_output_format, "Text&Code output format abnormal.", self.allowed_output_format["text&code"])
|
|
|
|
html_config = self.setups.get("html", "")
|
|
if html_config:
|
|
html_output_format = html_config.get("output_format", "")
|
|
self.check_valid_value(html_output_format, "HTML output format abnormal.", self.allowed_output_format["html"])
|
|
|
|
audio_config = self.setups.get("audio", "")
|
|
if audio_config:
|
|
audio_vlm = audio_config.get("vlm") or {}
|
|
self.check_empty(audio_vlm.get("llm_id"), "Audio VLM")
|
|
|
|
video_config = self.setups.get("video", "")
|
|
if video_config:
|
|
video_vlm = video_config.get("vlm") or {}
|
|
self.check_empty(video_vlm.get("llm_id"), "Video VLM")
|
|
email_config = self.setups.get("email", "")
|
|
if email_config:
|
|
email_output_format = email_config.get("output_format", "")
|
|
self.check_valid_value(email_output_format, "Email output format abnormal.", self.allowed_output_format["email"])
|
|
|
|
epub_config = self.setups.get("epub", "")
|
|
if epub_config:
|
|
epub_output_format = epub_config.get("output_format", "")
|
|
self.check_valid_value(epub_output_format, "EPUB output format abnormal.", self.allowed_output_format["epub"])
|
|
|
|
def get_input_form(self) -> dict[str, dict]:
|
|
return {}
|
|
|
|
|
|
class Parser(ProcessBase):
|
|
component_name = "Parser"
|
|
|
|
def _pdf(self, name, blob, **kwargs):
|
|
"""Parse PDF files into structured boxes or markdown/json output."""
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on a PDF.")
|
|
conf = self._param.setups["pdf"]
|
|
self.set_output("output_format", conf["output_format"])
|
|
flatten_media_to_text = conf.get("flatten_media_to_text")
|
|
pdf_parser = None
|
|
|
|
# Normalize parser selection and optional provider-specific model name.
|
|
raw_parse_method = conf.get("parse_method", "")
|
|
parser_model_name = None
|
|
parse_method = raw_parse_method
|
|
parse_method = parse_method or ""
|
|
if isinstance(raw_parse_method, str):
|
|
lowered = raw_parse_method.lower()
|
|
if lowered.endswith("@mineru"):
|
|
parser_model_name = raw_parse_method.rsplit("@", 1)[0]
|
|
parse_method = "MinerU"
|
|
elif lowered.endswith("@paddleocr"):
|
|
parser_model_name = raw_parse_method.rsplit("@", 1)[0]
|
|
parse_method = "PaddleOCR"
|
|
|
|
# DeepDOC returns structured page boxes directly.
|
|
if parse_method.lower() == "deepdoc":
|
|
pdf_parser = RAGFlowPdfParser()
|
|
bboxes = pdf_parser.parse_into_bboxes(blob, callback=self.callback)
|
|
if conf.get("enable_multi_column"):
|
|
bboxes = reorder_multi_column_bboxes(pdf_parser, bboxes)
|
|
|
|
# Plain text only keeps extracted text lines.
|
|
elif parse_method.lower() == "plain_text":
|
|
pdf_parser = PlainParser()
|
|
lines, _ = pdf_parser(blob)
|
|
bboxes = [{"text": t, "layout_type": "text"} for t, _ in lines]
|
|
|
|
# MinerU/PaddleOCR/Docling/TCADP all return line-like sections that need
|
|
# to be converted into the shared bbox-like structure used below.
|
|
elif parse_method.lower() == "mineru":
|
|
|
|
def resolve_mineru_llm_name():
|
|
configured = parser_model_name or conf.get("mineru_llm_name")
|
|
if configured:
|
|
return configured
|
|
|
|
tenant_id = self._canvas._tenant_id
|
|
if not tenant_id:
|
|
return None
|
|
|
|
from api.db.services.tenant_llm_service import TenantLLMService
|
|
|
|
env_name = TenantLLMService.ensure_mineru_from_env(tenant_id)
|
|
candidates = TenantLLMService.query(tenant_id=tenant_id, llm_factory="MinerU", model_type=LLMType.OCR.value)
|
|
if candidates:
|
|
return candidates[0].llm_name
|
|
return env_name
|
|
|
|
parser_model_name = resolve_mineru_llm_name()
|
|
if not parser_model_name:
|
|
raise RuntimeError("MinerU model not configured. Please add MinerU in Model Providers or set MINERU_* env.")
|
|
|
|
tenant_id = self._canvas._tenant_id
|
|
ocr_model_config = get_model_config_from_provider_instance(tenant_id, LLMType.OCR, parser_model_name)
|
|
ocr_model = LLMBundle(tenant_id, ocr_model_config, lang=conf.get("lang", "Chinese"))
|
|
pdf_parser = ocr_model.mdl
|
|
|
|
lines, _ = pdf_parser.parse_pdf(
|
|
filepath=name,
|
|
binary=blob,
|
|
callback=self.callback,
|
|
parse_method="pipeline",
|
|
lang=conf.get("lang", "Chinese"),
|
|
)
|
|
bboxes = []
|
|
for line in lines or []:
|
|
if not isinstance(line, tuple) or len(line) < 3:
|
|
continue
|
|
|
|
t, layout_type, poss = line[0], line[1], line[2]
|
|
box = {
|
|
"text": t,
|
|
"layout_type": layout_type or "text",
|
|
}
|
|
positions = [[pos[0][-1] + 1, *pos[1:]] for pos in pdf_parser.extract_positions(poss)]
|
|
if positions:
|
|
box["positions"] = positions
|
|
image = pdf_parser.crop(poss, 1)
|
|
if image is not None:
|
|
box["image"] = image
|
|
bboxes.append(box)
|
|
|
|
elif parse_method.lower() == "docling":
|
|
pdf_parser = DoclingParser(docling_server_url=os.environ.get("DOCLING_SERVER_URL", ""))
|
|
lines, _ = pdf_parser.parse_pdf(
|
|
filepath=name,
|
|
binary=blob,
|
|
callback=self.callback,
|
|
parse_method="pipeline",
|
|
docling_server_url=os.environ.get("DOCLING_SERVER_URL", ""),
|
|
)
|
|
bboxes = []
|
|
for item in lines or []:
|
|
if not isinstance(item, tuple) or len(item) < 3:
|
|
continue
|
|
text, layout_type, poss = item[0], item[1], item[2]
|
|
box = {
|
|
"text": text,
|
|
"layout_type": layout_type or "text",
|
|
}
|
|
if isinstance(poss, str) and poss:
|
|
positions = [[pos[0][-1] + 1, *pos[1:]] for pos in pdf_parser.extract_positions(poss)]
|
|
if positions:
|
|
box["positions"] = positions
|
|
image = pdf_parser.crop(poss, 1)
|
|
if image is not None:
|
|
box["image"] = image
|
|
bboxes.append(box)
|
|
|
|
elif parse_method.lower() == "opendataloader":
|
|
|
|
def resolve_opendataloader_llm_name():
|
|
configured = parser_model_name or conf.get("opendataloader_llm_name")
|
|
if configured:
|
|
return configured
|
|
tenant_id = self._canvas._tenant_id
|
|
if not tenant_id:
|
|
return None
|
|
from api.db.services.tenant_llm_service import TenantLLMService
|
|
env_name = TenantLLMService.ensure_opendataloader_from_env(tenant_id)
|
|
candidates = TenantLLMService.query(tenant_id=tenant_id, llm_factory="OpenDataLoader", model_type=LLMType.OCR.value)
|
|
if candidates:
|
|
return candidates[0].llm_name
|
|
return env_name
|
|
|
|
parser_model_name = resolve_opendataloader_llm_name()
|
|
if not parser_model_name:
|
|
raise RuntimeError("OpenDataLoader model not configured. Please add OpenDataLoader in Model Providers.")
|
|
|
|
tenant_id = self._canvas._tenant_id
|
|
ocr_model_config = get_model_config_from_provider_instance(tenant_id, LLMType.OCR, parser_model_name)
|
|
ocr_model = LLMBundle(tenant_id, ocr_model_config)
|
|
pdf_parser = ocr_model.mdl
|
|
|
|
lines, odl_tables = pdf_parser.parse_pdf(
|
|
filepath=name,
|
|
binary=blob,
|
|
callback=self.callback,
|
|
parse_method="pipeline",
|
|
)
|
|
bboxes = []
|
|
for item in lines or []:
|
|
if not isinstance(item, tuple) or len(item) < 3:
|
|
continue
|
|
text, layout_type, poss = item[0], item[1], item[2]
|
|
box = {
|
|
"text": text,
|
|
"layout_type": layout_type or "text",
|
|
}
|
|
if isinstance(poss, str) and poss:
|
|
positions = [[pos[0][-1] + 1, *pos[1:]] for pos in pdf_parser.extract_positions(poss)]
|
|
if positions:
|
|
box["positions"] = positions
|
|
image = pdf_parser.crop(poss, 1)
|
|
if image is not None:
|
|
box["image"] = image
|
|
bboxes.append(box)
|
|
# Merge tables and images from the second return value.
|
|
for (img, html_or_caption), positions in odl_tables or []:
|
|
box = {"layout_type": "table" if not isinstance(html_or_caption, list) else "figure"}
|
|
if isinstance(html_or_caption, str):
|
|
box["text"] = html_or_caption
|
|
elif isinstance(html_or_caption, list):
|
|
box["text"] = html_or_caption[0] if html_or_caption else ""
|
|
if img is not None:
|
|
box["image"] = img
|
|
if positions:
|
|
try:
|
|
box["positions"] = [[p[0] + 1, p[1], p[2], p[3], p[4]] for p in positions]
|
|
except Exception:
|
|
pass
|
|
bboxes.append(box)
|
|
|
|
elif parse_method.lower() == "tcadp parser":
|
|
# ADP is a document parsing tool using Tencent Cloud API
|
|
table_result_type = conf.get("table_result_type", "1")
|
|
markdown_image_response_type = conf.get("markdown_image_response_type", "1")
|
|
pdf_parser = TCADPParser(
|
|
table_result_type=table_result_type,
|
|
markdown_image_response_type=markdown_image_response_type,
|
|
)
|
|
sections, _ = pdf_parser.parse_pdf(
|
|
filepath=name,
|
|
binary=blob,
|
|
callback=self.callback,
|
|
file_type="PDF",
|
|
file_start_page=1,
|
|
file_end_page=1000,
|
|
)
|
|
bboxes = []
|
|
for section, position_tag in sections:
|
|
if position_tag:
|
|
match = re.match(r"@@([0-9-]+)\t([0-9.]+)\t([0-9.]+)\t([0-9.]+)\t([0-9.]+)##", position_tag)
|
|
if match:
|
|
pn, x0, x1, top, bott = match.groups()
|
|
bboxes.append(
|
|
{
|
|
"page_number": int(pn.split("-")[0]),
|
|
"x0": float(x0),
|
|
"x1": float(x1),
|
|
"top": float(top),
|
|
"bottom": float(bott),
|
|
"text": section,
|
|
"layout_type": "text",
|
|
}
|
|
)
|
|
else:
|
|
bboxes.append({"text": section, "layout_type": "text"})
|
|
else:
|
|
bboxes.append({"text": section, "layout_type": "text"})
|
|
|
|
elif parse_method.lower() == "paddleocr":
|
|
|
|
def resolve_paddleocr_llm_name():
|
|
configured = parser_model_name or conf.get("paddleocr_llm_name")
|
|
if configured:
|
|
return configured
|
|
|
|
tenant_id = self._canvas._tenant_id
|
|
if not tenant_id:
|
|
return None
|
|
|
|
from api.db.services.tenant_llm_service import TenantLLMService
|
|
|
|
env_name = TenantLLMService.ensure_paddleocr_from_env(tenant_id)
|
|
candidates = TenantLLMService.query(tenant_id=tenant_id, llm_factory="PaddleOCR", model_type=LLMType.OCR.value)
|
|
if candidates:
|
|
return candidates[0].llm_name
|
|
return env_name
|
|
|
|
parser_model_name = resolve_paddleocr_llm_name()
|
|
if not parser_model_name:
|
|
raise RuntimeError("PaddleOCR model not configured. Please add PaddleOCR in Model Providers or set PADDLEOCR_* env.")
|
|
|
|
tenant_id = self._canvas._tenant_id
|
|
ocr_model_config = get_model_config_from_provider_instance(tenant_id, LLMType.OCR, parser_model_name)
|
|
ocr_model = LLMBundle(tenant_id, ocr_model_config)
|
|
pdf_parser = ocr_model.mdl
|
|
|
|
lines, _ = pdf_parser.parse_pdf(
|
|
filepath=name,
|
|
binary=blob,
|
|
callback=self.callback,
|
|
parse_method="pipeline",
|
|
)
|
|
bboxes = []
|
|
for line in lines or []:
|
|
if not isinstance(line, tuple) or len(line) < 3:
|
|
continue
|
|
|
|
t, layout_type, poss = line[0], line[1], line[2]
|
|
box = {
|
|
"text": t,
|
|
"layout_type": layout_type or "text",
|
|
}
|
|
positions = [[pos[0][-1] + 1, *pos[1:]] for pos in pdf_parser.extract_positions(poss)]
|
|
if positions:
|
|
box["positions"] = positions
|
|
image = pdf_parser.crop(poss)
|
|
if image is not None:
|
|
box["image"] = image
|
|
bboxes.append(box)
|
|
# Vision parser treats each page as a large image block.
|
|
else:
|
|
if conf.get("parse_method"):
|
|
vision_model_config = get_model_config_from_provider_instance(self._canvas._tenant_id, LLMType.IMAGE2TEXT, conf["parse_method"])
|
|
else:
|
|
vision_model_config = get_tenant_default_model_by_type(self._canvas._tenant_id, LLMType.IMAGE2TEXT)
|
|
vision_model = LLMBundle(self._canvas._tenant_id, vision_model_config, lang=self._param.setups["pdf"].get("lang"))
|
|
pdf_parser = VisionParser(vision_model=vision_model)
|
|
lines, _ = pdf_parser(blob, callback=self.callback)
|
|
bboxes = []
|
|
for t, poss in lines:
|
|
for pn, x0, x1, top, bott in RAGFlowPdfParser.extract_positions(poss):
|
|
bboxes.append(
|
|
{
|
|
"page_number": int(pn[0]) + 1,
|
|
"x0": float(x0),
|
|
"x1": float(x1),
|
|
"top": float(top),
|
|
"bottom": float(bott),
|
|
"text": t,
|
|
"layout_type": "text",
|
|
}
|
|
)
|
|
|
|
# Persist outlines and optionally remove TOC before normalizing metadata.
|
|
self.set_output("file", {**kwargs.get("file", {}), "outlines": pdf_parser.outlines})
|
|
if conf.get("remove_toc"):
|
|
if not pdf_parser.outlines:
|
|
bboxes, _ = remove_toc(bboxes)
|
|
elif pdf_parser.outlines[0][2] == 1:
|
|
bboxes = remove_toc_pdf(bboxes, pdf_parser.outlines)
|
|
else:
|
|
first_outline_page = pdf_parser.outlines[0][2]
|
|
split_at = len(bboxes)
|
|
for i, item in enumerate(bboxes):
|
|
page_number = item.get("page_number")
|
|
if page_number is None:
|
|
positions = extract_pdf_positions(item)
|
|
if positions:
|
|
page_number = positions[0][0]
|
|
if page_number is not None and page_number >= first_outline_page:
|
|
split_at = i
|
|
break
|
|
toc_bboxes, _ = remove_toc(bboxes[:split_at])
|
|
bboxes = toc_bboxes + bboxes[split_at:]
|
|
|
|
normalize_bboxes = []
|
|
# Normalize shared bbox fields for downstream consumers.
|
|
for b in bboxes:
|
|
raw_layout = str(b.get("layout_type") or "").strip()
|
|
has_layout = bool(raw_layout)
|
|
layout = re.sub(r"\s+", " ", raw_layout) if has_layout else "text"
|
|
b["layout_type"] = layout
|
|
if conf.get("remove_header_footer") and re.search(r"(header|footer|number)", raw_layout, re.I):
|
|
continue
|
|
if flatten_media_to_text:
|
|
b["doc_type_kwd"] = "text"
|
|
elif layout == "table":
|
|
b["doc_type_kwd"] = "table"
|
|
elif layout == "figure":
|
|
b["doc_type_kwd"] = "image"
|
|
elif not has_layout and b.get("image") is not None:
|
|
b["doc_type_kwd"] = "image"
|
|
else:
|
|
b["doc_type_kwd"] = "text"
|
|
normalize_bboxes.append(b)
|
|
bboxes = normalize_bboxes
|
|
|
|
enhance_media_sections_with_vision(
|
|
bboxes,
|
|
self._canvas._tenant_id,
|
|
conf.get("vlm"),
|
|
callback=self.callback,
|
|
)
|
|
|
|
# Emit the requested final PDF output format.
|
|
if conf.get("output_format") == "json":
|
|
normalize_pdf_items_metadata(bboxes)
|
|
self.set_output("json", bboxes)
|
|
if conf.get("output_format") == "markdown":
|
|
mkdn = ""
|
|
for b in bboxes:
|
|
if b.get("layout_type", "") == "title":
|
|
mkdn += "\n## "
|
|
if b.get("layout_type", "") == "figure":
|
|
mkdn += "\n".format(VLM.image2base64(b["image"]))
|
|
continue
|
|
mkdn += b.get("text", "") + "\n"
|
|
self.set_output("markdown", mkdn)
|
|
|
|
def _spreadsheet(self, name, blob, **kwargs):
|
|
"""Parse spreadsheet files and normalize them into html/json/markdown output."""
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on a Spreadsheet.")
|
|
conf = self._param.setups["spreadsheet"]
|
|
self.set_output("output_format", conf["output_format"])
|
|
flatten_media_to_text = conf.get("flatten_media_to_text")
|
|
|
|
parse_method = conf.get("parse_method", "deepdoc")
|
|
|
|
# Handle TCADP parser
|
|
if parse_method.lower() == "tcadp parser":
|
|
table_result_type = conf.get("table_result_type", "1")
|
|
markdown_image_response_type = conf.get("markdown_image_response_type", "1")
|
|
tcadp_parser = TCADPParser(
|
|
table_result_type=table_result_type,
|
|
markdown_image_response_type=markdown_image_response_type,
|
|
)
|
|
if not tcadp_parser.check_installation():
|
|
raise RuntimeError("TCADP parser not available. Please check Tencent Cloud API configuration.")
|
|
|
|
# Determine file type based on extension
|
|
if re.search(r"\.xlsx?$", name, re.IGNORECASE):
|
|
file_type = "XLSX"
|
|
else:
|
|
file_type = "CSV"
|
|
|
|
self.callback(0.2, f"Using TCADP parser for {file_type} file.")
|
|
sections, tables = tcadp_parser.parse_pdf(
|
|
filepath=name,
|
|
binary=blob,
|
|
callback=self.callback,
|
|
file_type=file_type,
|
|
file_start_page=1,
|
|
file_end_page=1000,
|
|
)
|
|
|
|
# Process TCADP parser output based on configured output_format
|
|
output_format = conf.get("output_format", "html")
|
|
|
|
if output_format == "html":
|
|
# For HTML output, combine sections and tables into HTML
|
|
html_content = ""
|
|
for section, position_tag in sections:
|
|
if section:
|
|
html_content += section + "\n"
|
|
for table in tables:
|
|
if table:
|
|
html_content += table + "\n"
|
|
|
|
self.set_output("html", html_content)
|
|
|
|
elif output_format == "json":
|
|
# For JSON output, create a list of text items
|
|
result = []
|
|
# Add sections as text
|
|
for section, position_tag in sections:
|
|
if section:
|
|
result.append({"text": section, "doc_type_kwd": "text"})
|
|
# Add tables as text
|
|
for table in tables:
|
|
if table:
|
|
result.append(
|
|
{
|
|
"text": table,
|
|
"doc_type_kwd": "text" if flatten_media_to_text else "table",
|
|
}
|
|
)
|
|
|
|
self.set_output("json", result)
|
|
|
|
elif output_format == "markdown":
|
|
# For markdown output, combine into markdown
|
|
md_content = ""
|
|
for section, position_tag in sections:
|
|
if section:
|
|
md_content += section + "\n\n"
|
|
for table in tables:
|
|
if table:
|
|
md_content += table + "\n\n"
|
|
|
|
self.set_output("markdown", md_content)
|
|
else:
|
|
# Default DeepDOC parser
|
|
spreadsheet_parser = ExcelParser()
|
|
if conf.get("output_format") == "html":
|
|
htmls = spreadsheet_parser.html(blob, 1000000000)
|
|
self.set_output("html", htmls[0])
|
|
elif conf.get("output_format") == "json":
|
|
self.set_output("json", [{"text": txt, "doc_type_kwd": "text"} for txt in spreadsheet_parser(blob) if txt])
|
|
elif conf.get("output_format") == "markdown":
|
|
self.set_output("markdown", spreadsheet_parser.markdown(blob))
|
|
|
|
def _doc(self, name, blob, **kwargs):
|
|
"""Parse DOC files into text/json sections."""
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on a DOC document")
|
|
conf = self._param.setups["doc"]
|
|
self.set_output("output_format", conf["output_format"])
|
|
|
|
from tika import parser as tika_parser
|
|
|
|
parsed = tika_parser.from_buffer(io.BytesIO(blob))
|
|
sections = [line for line in parsed["content"].split("\n") if line]
|
|
|
|
if conf.get("output_format") == "json":
|
|
self.set_output("json", [{"text": section, "doc_type_kwd": "text"} for section in sections])
|
|
return
|
|
|
|
self.set_output("markdown", "\n".join(sections))
|
|
|
|
def _docx(self, name, blob, **kwargs):
|
|
"""Parse DOCX files and optionally remove table-of-contents content."""
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on a DOCX document")
|
|
conf = self._param.setups["docx"]
|
|
self.set_output("output_format", conf["output_format"])
|
|
flatten_media_to_text = conf.get("flatten_media_to_text")
|
|
|
|
if re.search(r"\.doc$", name, re.IGNORECASE):
|
|
self.set_output("file", {**kwargs.get("file", {}), "outlines": []})
|
|
try:
|
|
from tika import parser as tika_parser
|
|
except Exception as e:
|
|
msg = f"tika not available: {e}. Unsupported .doc parsing."
|
|
self.callback(0.8, msg)
|
|
logging.warning(f"{msg} for {name}.")
|
|
return
|
|
|
|
doc_parsed = tika_parser.from_buffer(io.BytesIO(blob))
|
|
content = doc_parsed.get("content")
|
|
if content is None:
|
|
msg = f"tika.parser got empty content from {name}."
|
|
self.callback(0.8, msg)
|
|
logging.warning(msg)
|
|
return
|
|
|
|
sections = [line.strip() for line in content.splitlines() if line and line.strip()]
|
|
if conf.get("remove_toc"):
|
|
sections = remove_toc_word(sections, [])
|
|
|
|
if conf.get("output_format") == "json":
|
|
self.set_output(
|
|
"json",
|
|
[{"text": line, "image": None, "doc_type_kwd": "text"} for line in sections],
|
|
)
|
|
elif conf.get("output_format") == "markdown":
|
|
# Tika gives us plain text lines, so join with blank lines to preserve paragraph boundaries in markdown.
|
|
self.set_output("markdown", "\n\n".join(sections))
|
|
|
|
self.callback(0.8, "Finish parsing.")
|
|
return
|
|
|
|
docx_parser = Docx()
|
|
|
|
# Extract heading-based outlines for metadata and TOC removal.
|
|
outlines = extract_word_outlines(name, blob)
|
|
self.set_output("file", {**kwargs.get("file", {}), "outlines": outlines})
|
|
|
|
# JSON output keeps text/image blocks and appends table HTML as table items.
|
|
if conf.get("output_format") == "json":
|
|
main_sections = docx_parser(name, binary=blob)
|
|
if conf.get("remove_header_footer"):
|
|
header_footer_texts = extract_docx_header_footer_texts(binary=blob)
|
|
main_sections = remove_header_footer_docx_sections(main_sections, header_footer_texts)
|
|
if conf.get("remove_toc"):
|
|
main_sections = remove_toc_word(main_sections, outlines)
|
|
sections = []
|
|
for text, image, html in main_sections:
|
|
sections.append(
|
|
{
|
|
"text": text,
|
|
"image": image,
|
|
"doc_type_kwd": "text" if flatten_media_to_text or image is None else "image",
|
|
}
|
|
)
|
|
if html:
|
|
sections.append(
|
|
{
|
|
"text": html,
|
|
"image": None,
|
|
"doc_type_kwd": "text" if flatten_media_to_text else "table",
|
|
}
|
|
)
|
|
enhance_media_sections_with_vision(
|
|
sections,
|
|
self._canvas._tenant_id,
|
|
conf.get("vlm"),
|
|
callback=self.callback,
|
|
)
|
|
|
|
self.set_output("json", sections)
|
|
|
|
# Markdown output removes TOC on plain markdown lines before writing back.
|
|
elif conf.get("output_format") == "markdown":
|
|
markdown_text = docx_parser.to_markdown(name, binary=blob)
|
|
if conf.get("remove_header_footer"):
|
|
header_footer_texts = extract_docx_header_footer_texts(binary=blob)
|
|
markdown_lines = remove_header_footer_docx_sections(markdown_text.split("\n"), header_footer_texts)
|
|
markdown_text = "\n".join(markdown_lines)
|
|
if conf.get("remove_toc"):
|
|
markdown_text = "\n".join(remove_toc_word(markdown_text.split("\n"), outlines))
|
|
|
|
self.set_output("markdown", markdown_text)
|
|
|
|
def _slides(self, name, blob, **kwargs):
|
|
"""Parse presentation files into json sections."""
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on a PowerPoint Document")
|
|
|
|
conf = self._param.setups["slides"]
|
|
self.set_output("output_format", conf["output_format"])
|
|
|
|
parse_method = conf.get("parse_method", "deepdoc")
|
|
|
|
# Handle TCADP parser
|
|
if parse_method.lower() == "tcadp parser":
|
|
table_result_type = conf.get("table_result_type", "1")
|
|
markdown_image_response_type = conf.get("markdown_image_response_type", "1")
|
|
tcadp_parser = TCADPParser(
|
|
table_result_type=table_result_type,
|
|
markdown_image_response_type=markdown_image_response_type,
|
|
)
|
|
if not tcadp_parser.check_installation():
|
|
raise RuntimeError("TCADP parser not available. Please check Tencent Cloud API configuration.")
|
|
|
|
# Determine file type based on extension
|
|
if re.search(r"\.pptx?$", name, re.IGNORECASE):
|
|
file_type = "PPTX"
|
|
else:
|
|
file_type = "PPT"
|
|
|
|
self.callback(0.2, f"Using TCADP parser for {file_type} file.")
|
|
|
|
sections, tables = tcadp_parser.parse_pdf(
|
|
filepath=name,
|
|
binary=blob,
|
|
callback=self.callback,
|
|
file_type=file_type,
|
|
file_start_page=1,
|
|
file_end_page=1000,
|
|
)
|
|
|
|
# Process TCADP parser output - PPT only supports json format
|
|
output_format = conf.get("output_format", "json")
|
|
if output_format == "json":
|
|
# For JSON output, create a list of text items
|
|
result = []
|
|
# Add sections as text
|
|
for section, position_tag in sections:
|
|
if section:
|
|
result.append({"text": section, "doc_type_kwd": "text"})
|
|
# Add tables as text
|
|
for table in tables:
|
|
if table:
|
|
result.append({"text": table, "doc_type_kwd": "table"})
|
|
|
|
self.set_output("json", result)
|
|
else:
|
|
# Default DeepDOC parser (supports .pptx format)
|
|
from deepdoc.parser.ppt_parser import RAGFlowPptParser as ppt_parser
|
|
|
|
ppt_parser = ppt_parser()
|
|
txts = ppt_parser(blob, 0, 100000, None)
|
|
|
|
sections = [{"text": section, "doc_type_kwd": "text"} for section in txts if section.strip()]
|
|
|
|
# json
|
|
assert conf.get("output_format") == "json", "have to be json for ppt"
|
|
if conf.get("output_format") == "json":
|
|
self.set_output("json", sections)
|
|
|
|
def _markdown(self, name, blob, **kwargs):
|
|
"""Parse markdown files into text/json sections."""
|
|
from functools import reduce
|
|
|
|
from rag.app.naive import Markdown as naive_markdown_parser
|
|
from rag.nlp import concat_img
|
|
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on a markdown.")
|
|
conf = self._param.setups["markdown"]
|
|
self.set_output("output_format", conf["output_format"])
|
|
flatten_media_to_text = conf.get("flatten_media_to_text")
|
|
|
|
markdown_parser = naive_markdown_parser()
|
|
sections, tables, section_images = markdown_parser(
|
|
name,
|
|
blob,
|
|
separate_tables=False,
|
|
delimiter=conf.get("delimiter"),
|
|
return_section_images=True,
|
|
)
|
|
|
|
if conf.get("output_format") == "json":
|
|
json_results = []
|
|
|
|
for idx, (section_text, _) in enumerate(sections):
|
|
json_result = {
|
|
"text": section_text,
|
|
}
|
|
|
|
images = []
|
|
if section_images and len(section_images) > idx and section_images[idx] is not None:
|
|
images.append(section_images[idx])
|
|
if images:
|
|
# If multiple images found, combine them using concat_img
|
|
combined_image = reduce(concat_img, images) if len(images) > 1 else images[0]
|
|
json_result["image"] = combined_image
|
|
json_result["doc_type_kwd"] = (
|
|
"text"
|
|
if flatten_media_to_text or json_result.get("image") is None
|
|
else "image"
|
|
)
|
|
json_results.append(json_result)
|
|
|
|
for table in tables:
|
|
table_text = table[0][1] if table and table[0] else ""
|
|
if table_text:
|
|
json_results.append(
|
|
{
|
|
"text": table_text,
|
|
"doc_type_kwd": "text" if flatten_media_to_text else "table",
|
|
}
|
|
)
|
|
|
|
enhance_media_sections_with_vision(
|
|
json_results,
|
|
self._canvas._tenant_id,
|
|
conf.get("vlm"),
|
|
callback=self.callback,
|
|
)
|
|
self.set_output("json", json_results)
|
|
else:
|
|
texts = [section_text for section_text, _ in sections if section_text]
|
|
texts.extend(table[0][1] for table in tables if table and table[0] and table[0][1])
|
|
self.set_output("text", "\n".join(texts))
|
|
|
|
def _code(self, name, blob, **kwargs):
|
|
"""Parse text and source code files as plain text chunks."""
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on a text or code file.")
|
|
conf = self._param.setups["text&code"]
|
|
self.set_output("output_format", conf["output_format"])
|
|
|
|
sections = TxtParser()(
|
|
name,
|
|
blob,
|
|
conf.get("chunk_token_num", 128),
|
|
conf.get("delimiter", "\n!?;。;!?"),
|
|
)
|
|
if conf.get("output_format") == "json":
|
|
self.set_output("json", [{"text": section[0], "doc_type_kwd": "text"} for section in sections if section[0]])
|
|
return
|
|
|
|
self.set_output("text", "\n".join([section[0] for section in sections if section[0]]))
|
|
|
|
def _html(self, name, blob, **kwargs):
|
|
"""Parse HTML files into text/json sections."""
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on an HTML document.")
|
|
conf = self._param.setups["html"]
|
|
self.set_output("output_format", conf["output_format"])
|
|
|
|
if conf.get("remove_header_footer"):
|
|
blob = remove_header_footer_html_blob(blob)
|
|
|
|
sections = HtmlParser()(name, blob, int(conf.get("chunk_token_num", 512)))
|
|
if conf.get("remove_toc"):
|
|
sections, _ = remove_toc(sections)
|
|
if conf.get("output_format") == "json":
|
|
self.set_output("json", [{"text": section, "doc_type_kwd": "text"} for section in sections if section])
|
|
return
|
|
|
|
self.set_output("text", "\n".join([section for section in sections if section]))
|
|
|
|
def _image(self, name, blob, **kwargs):
|
|
"""Parse images with OCR or image-to-text models."""
|
|
from deepdoc.vision import OCR
|
|
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on an image.")
|
|
conf = self._param.setups["image"]
|
|
self.set_output("output_format", "json")
|
|
|
|
img = Image.open(io.BytesIO(blob)).convert("RGB")
|
|
|
|
if conf["parse_method"] == "ocr":
|
|
# use ocr, recognize chars only
|
|
ocr = OCR()
|
|
bxs = ocr(np.array(img)) # return boxes and recognize result
|
|
txt = "\n".join([t[0] for _, t in bxs if t[0]])
|
|
else:
|
|
lang = conf["lang"]
|
|
# use VLM to describe the picture
|
|
cv_model_config = get_model_config_from_provider_instance(self._canvas.get_tenant_id(), LLMType.IMAGE2TEXT, conf["parse_method"])
|
|
cv_model = LLMBundle(self._canvas.get_tenant_id(), cv_model_config, lang=lang)
|
|
img_binary = io.BytesIO()
|
|
img.save(img_binary, format="JPEG")
|
|
img_binary.seek(0)
|
|
|
|
system_prompt = conf.get("system_prompt")
|
|
if system_prompt:
|
|
txt = cv_model.describe_with_prompt(img_binary.read(), system_prompt)
|
|
else:
|
|
txt = cv_model.describe(img_binary.read())
|
|
|
|
json_result = [
|
|
{
|
|
"text": txt,
|
|
"image": img,
|
|
"doc_type_kwd": "image",
|
|
}
|
|
]
|
|
self.set_output("json", json_result)
|
|
|
|
def _audio(self, name, blob, **kwargs):
|
|
"""Parse audio files with speech-to-text models."""
|
|
import os
|
|
import tempfile
|
|
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on an audio.")
|
|
|
|
conf = self._param.setups["audio"]
|
|
vlm = conf.get("vlm")
|
|
self.set_output("output_format", conf["output_format"])
|
|
_, ext = os.path.splitext(name)
|
|
with tempfile.NamedTemporaryFile(suffix=ext) as tmpf:
|
|
tmpf.write(blob)
|
|
tmpf.flush()
|
|
tmp_path = os.path.abspath(tmpf.name)
|
|
seq2txt_model_config = get_model_config_from_provider_instance(self._canvas.get_tenant_id(), LLMType.SPEECH2TEXT, vlm["llm_id"])
|
|
seq2txt_mdl = LLMBundle(self._canvas.get_tenant_id(), seq2txt_model_config)
|
|
txt = seq2txt_mdl.transcription(tmp_path)
|
|
|
|
self.set_output("text", txt)
|
|
|
|
def _video(self, name, blob, **kwargs):
|
|
"""Parse video files with image-to-text models."""
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on an video.")
|
|
|
|
conf = self._param.setups["video"]
|
|
vlm = conf.get("vlm")
|
|
self.set_output("output_format", conf["output_format"])
|
|
cv_model_config = get_model_config_from_provider_instance(self._canvas.get_tenant_id(), LLMType.IMAGE2TEXT, vlm["llm_id"])
|
|
cv_mdl = LLMBundle(self._canvas.get_tenant_id(), cv_model_config)
|
|
video_prompt = str(conf.get("prompt", "") or "")
|
|
txt = asyncio.run(cv_mdl.async_chat(system="", history=[], gen_conf={}, video_bytes=blob, filename=name, video_prompt=video_prompt))
|
|
|
|
self.set_output("text", txt)
|
|
|
|
def _email(self, name, blob, **kwargs):
|
|
"""Parse eml/msg files into structured email content."""
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on an email.")
|
|
|
|
email_content = {}
|
|
conf = self._param.setups["email"]
|
|
self.set_output("output_format", conf["output_format"])
|
|
target_fields = conf["fields"]
|
|
|
|
_, ext = os.path.splitext(name)
|
|
if ext == ".eml":
|
|
# handle eml file
|
|
from email import policy
|
|
from email.parser import BytesParser
|
|
|
|
msg = BytesParser(policy=policy.default).parse(io.BytesIO(blob))
|
|
email_content["metadata"] = {}
|
|
# handle header info
|
|
for header, value in msg.items():
|
|
# get fields like from, to, cc, bcc, date, subject
|
|
if header.lower() in target_fields:
|
|
email_content[header.lower()] = value
|
|
# get metadata
|
|
elif header.lower() not in ["from", "to", "cc", "bcc", "date", "subject"]:
|
|
email_content["metadata"][header.lower()] = value
|
|
# get body
|
|
if "body" in target_fields:
|
|
body_text, body_html = [], []
|
|
|
|
def _add_content(m, content_type):
|
|
def _decode_payload(payload, charset, target_list):
|
|
try:
|
|
target_list.append(payload.decode(charset))
|
|
except (UnicodeDecodeError, LookupError):
|
|
for enc in ["utf-8", "gb2312", "gbk", "gb18030", "latin1"]:
|
|
try:
|
|
target_list.append(payload.decode(enc))
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
else:
|
|
target_list.append(payload.decode("utf-8", errors="ignore"))
|
|
|
|
if content_type == "text/plain":
|
|
payload = msg.get_payload(decode=True)
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
_decode_payload(payload, charset, body_text)
|
|
elif content_type == "text/html":
|
|
payload = msg.get_payload(decode=True)
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
_decode_payload(payload, charset, body_html)
|
|
elif "multipart" in content_type:
|
|
if m.is_multipart():
|
|
for part in m.iter_parts():
|
|
_add_content(part, part.get_content_type())
|
|
|
|
_add_content(msg, msg.get_content_type())
|
|
|
|
email_content["text"] = "\n".join(body_text)
|
|
email_content["text_html"] = "\n".join(body_html)
|
|
# get attachment
|
|
if "attachments" in target_fields:
|
|
attachments = []
|
|
for part in msg.iter_attachments():
|
|
content_disposition = part.get("Content-Disposition")
|
|
if content_disposition:
|
|
dispositions = content_disposition.strip().split(";")
|
|
if dispositions[0].lower() == "attachment":
|
|
filename = part.get_filename()
|
|
payload = part.get_payload(decode=True).decode(part.get_content_charset())
|
|
attachments.append(
|
|
{
|
|
"filename": filename,
|
|
"payload": payload,
|
|
}
|
|
)
|
|
email_content["attachments"] = attachments
|
|
else:
|
|
# handle msg file
|
|
import extract_msg
|
|
|
|
msg = extract_msg.Message(blob)
|
|
# handle header info
|
|
basic_content = {
|
|
"from": msg.sender,
|
|
"to": msg.to,
|
|
"cc": msg.cc,
|
|
"bcc": msg.bcc,
|
|
"date": msg.date,
|
|
"subject": msg.subject,
|
|
}
|
|
email_content.update({k: v for k, v in basic_content.items() if k in target_fields})
|
|
# get metadata
|
|
email_content["metadata"] = {
|
|
"message_id": msg.messageId,
|
|
"in_reply_to": msg.inReplyTo,
|
|
}
|
|
# get body
|
|
if "body" in target_fields:
|
|
email_content["text"] = msg.body[0] if isinstance(msg.body, list) and msg.body else msg.body
|
|
if not email_content["text"] and msg.htmlBody:
|
|
email_content["text"] = msg.htmlBody[0] if isinstance(msg.htmlBody, list) and msg.htmlBody else msg.htmlBody
|
|
# get attachments
|
|
if "attachments" in target_fields:
|
|
attachments = []
|
|
for t in msg.attachments:
|
|
attachments.append(
|
|
{
|
|
"filename": t.name,
|
|
"payload": t.data.decode("utf-8"),
|
|
}
|
|
)
|
|
email_content["attachments"] = attachments
|
|
|
|
if conf["output_format"] == "json":
|
|
email_content["doc_type_kwd"] = "text"
|
|
self.set_output("json", [email_content])
|
|
else:
|
|
content_txt = ""
|
|
for k, v in email_content.items():
|
|
if isinstance(v, str):
|
|
# basic info
|
|
content_txt += f"{k}:{v}" + "\n"
|
|
elif isinstance(v, dict):
|
|
# metadata
|
|
content_txt += f"{k}:{json.dumps(v)}" + "\n"
|
|
elif isinstance(v, list):
|
|
# attachments or others
|
|
for fb in v:
|
|
if isinstance(fb, dict):
|
|
# attachments
|
|
content_txt += f"{fb['filename']}:{fb['payload']}" + "\n"
|
|
else:
|
|
# str, usually plain text
|
|
content_txt += fb
|
|
self.set_output("text", content_txt)
|
|
|
|
def _epub(self, name, blob, **kwargs):
|
|
"""Parse EPUB files into text/json sections."""
|
|
from deepdoc.parser import EpubParser
|
|
|
|
self.callback(random.randint(1, 5) / 100.0, "Start to work on an EPUB.")
|
|
conf = self._param.setups["epub"]
|
|
self.set_output("output_format", conf["output_format"])
|
|
|
|
epub_parser = EpubParser()
|
|
sections = epub_parser(name, binary=blob)
|
|
|
|
if conf.get("output_format") == "json":
|
|
json_results = [{"text": s, "doc_type_kwd": "text"} for s in sections if s]
|
|
self.set_output("json", json_results)
|
|
else:
|
|
self.set_output("text", "\n".join(s for s in sections if s))
|
|
|
|
async def _invoke(self, **kwargs):
|
|
"""Dispatch the current file to the matching parser branch by suffix."""
|
|
function_map = {
|
|
"pdf": self._pdf,
|
|
"markdown": self._markdown,
|
|
"text&code": self._code,
|
|
"html": self._html,
|
|
"spreadsheet": self._spreadsheet,
|
|
"slides": self._slides,
|
|
"doc": self._doc,
|
|
"docx": self._docx,
|
|
"image": self._image,
|
|
"audio": self._audio,
|
|
"video": self._video,
|
|
"email": self._email,
|
|
"epub": self._epub,
|
|
}
|
|
|
|
try:
|
|
from_upstream = ParserFromUpstream.model_validate(kwargs)
|
|
except Exception as e:
|
|
self.set_output("_ERROR", f"Input error: {str(e)}")
|
|
return
|
|
|
|
name = from_upstream.name
|
|
if self._canvas._doc_id:
|
|
b, n = File2DocumentService.get_storage_address(doc_id=self._canvas._doc_id)
|
|
blob = settings.STORAGE_IMPL.get(b, n)
|
|
else:
|
|
blob = FileService.get_blob(from_upstream.file["created_by"], from_upstream.file["id"])
|
|
|
|
done = False
|
|
for p_type, conf in self._param.setups.items():
|
|
if from_upstream.name.split(".")[-1].lower() not in conf.get("suffix", []):
|
|
continue
|
|
call_kwargs = dict(kwargs)
|
|
call_kwargs.pop("name", None)
|
|
call_kwargs.pop("blob", None)
|
|
|
|
await thread_pool_exec(function_map[p_type], name, blob, **call_kwargs)
|
|
done = True
|
|
break
|
|
|
|
if not done:
|
|
raise Exception("No suitable for file extension: `.%s`" % from_upstream.name.split(".")[-1].lower())
|
|
|
|
outs = self.output()
|
|
tasks = []
|
|
for d in outs.get("json", []):
|
|
tasks.append(asyncio.create_task(image2id(d, partial(settings.STORAGE_IMPL.put, tenant_id=self._canvas._tenant_id), get_uuid())))
|
|
|
|
try:
|
|
await asyncio.gather(*tasks, return_exceptions=False)
|
|
except Exception as e:
|
|
logging.error("Error while parsing: %s" % e)
|
|
for t in tasks:
|
|
t.cancel()
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
raise
|