mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
### What problem does this PR solve?
1. Break huge function into smaller pieces
2. Add unit test for the smaller pieces function
3. Layer-ed design
a. infra layer - task_context.py, recording_context.py,
write_operation_interceptor.py, ...
b. service layer - *_service.py
c. business layer - task_handler.py
4. Default behavior: use "refactor-ed version" - can switch to original
version by change env variable
### Type of change
- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Refactoring
- [x] Performance Improvement
---------
Co-authored-by: Liu An <asiro@qq.com>
Co-authored-by: Zhichang Yu <yuzhichang@gmail.com>
137 lines
4.6 KiB
Python
137 lines
4.6 KiB
Python
#
|
|
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Chunk Builder Module.
|
|
|
|
Provides parser factory and document chunking logic:
|
|
- Parser module registration and selection
|
|
- Document chunking via parser
|
|
- PDF outline extraction
|
|
"""
|
|
|
|
import logging
|
|
from timeit import default_timer as timer
|
|
from typing import Dict, List
|
|
|
|
from common.constants import ParserType
|
|
from common.misc_utils import thread_pool_exec
|
|
from rag.svr.task_executor_refactor.task_context import TaskContext
|
|
|
|
from api.db.services.doc_metadata_service import DocMetadataService
|
|
from common.metadata_utils import update_metadata_to
|
|
from rag.utils.table_es_metadata import merge_table_parser_config_from_kb
|
|
|
|
|
|
def get_parser(parser_id: str):
|
|
"""Get parser module by ID.
|
|
|
|
Args:
|
|
parser_id: The parser identifier.
|
|
|
|
Returns:
|
|
The parser module for the given parser ID.
|
|
"""
|
|
from rag.app import laws, paper, presentation, manual, qa, table, book, resume, picture, naive, one, audio, email, tag
|
|
|
|
factory = {
|
|
"general": naive,
|
|
ParserType.NAIVE.value: naive,
|
|
ParserType.PAPER.value: paper,
|
|
ParserType.BOOK.value: book,
|
|
ParserType.PRESENTATION.value: presentation,
|
|
ParserType.MANUAL.value: manual,
|
|
ParserType.LAWS.value: laws,
|
|
ParserType.QA.value: qa,
|
|
ParserType.TABLE.value: table,
|
|
ParserType.RESUME.value: resume,
|
|
ParserType.PICTURE.value: picture,
|
|
ParserType.ONE.value: one,
|
|
ParserType.AUDIO.value: audio,
|
|
ParserType.EMAIL.value: email,
|
|
ParserType.KG.value: naive,
|
|
ParserType.TAG.value: tag,
|
|
}
|
|
return factory[parser_id.lower()]
|
|
|
|
|
|
async def run_chunking(
|
|
chunker,
|
|
binary: bytes,
|
|
ctx: TaskContext,
|
|
) -> List[Dict]:
|
|
"""Run document chunking via parser.
|
|
|
|
Args:
|
|
chunker: The parser module to use.
|
|
binary: Binary content of the document.
|
|
ctx: TaskContext containing task configuration.
|
|
|
|
Returns:
|
|
List of chunk dictionaries.
|
|
"""
|
|
st = timer()
|
|
try:
|
|
# Merge table parser config
|
|
parser_config = merge_table_parser_config_from_kb(ctx.raw_task)
|
|
|
|
async with ctx.chunk_limiter:
|
|
cks = await thread_pool_exec(
|
|
chunker.chunk,
|
|
ctx.name,
|
|
binary=binary,
|
|
from_page=ctx.from_page,
|
|
to_page=ctx.to_page,
|
|
lang=ctx.language,
|
|
callback=ctx.progress_cb,
|
|
kb_id=ctx.kb_id,
|
|
parser_config=parser_config,
|
|
tenant_id=ctx.tenant_id,
|
|
)
|
|
logging.info("Chunking({}) {}/{} done".format(timer() - st, ctx.location, ctx.name))
|
|
ctx.recording_context.record("parser_config_after_merge", parser_config)
|
|
return cks
|
|
except Exception as e:
|
|
ctx.progress_cb(-1, msg="Internal server error while chunking: %s" % str(e).replace("'", ""))
|
|
logging.exception("Chunking {}/{} got exception".format(ctx.location, ctx.name))
|
|
raise
|
|
|
|
|
|
async def extract_outline(cks: List[Dict], ctx: TaskContext) -> None:
|
|
"""Extract and persist PDF outline if present.
|
|
|
|
Args:
|
|
cks: List of chunk dictionaries.
|
|
ctx: TaskContext containing task configuration.
|
|
"""
|
|
outline_data = cks[0].get("__outline__") if cks else None
|
|
ctx.recording_context.record("outline_data", outline_data)
|
|
|
|
if cks and cks[0].get("__outline__"):
|
|
outline = cks[0].pop("__outline__")
|
|
try:
|
|
if ctx.write_interceptor:
|
|
ctx.write_interceptor.intercept("DocMetadataService.update_document_metadata")
|
|
else:
|
|
temp_doc = DocMetadataService.get_document_metadata(ctx.doc_id) or {}
|
|
DocMetadataService.update_document_metadata(
|
|
ctx.doc_id,
|
|
update_metadata_to({"outline": outline}, temp_doc)
|
|
)
|
|
|
|
logging.info("Persisted PDF outline (%d entries) for doc %s", len(outline), ctx.doc_id)
|
|
except Exception as e:
|
|
logging.warning("Failed to persist PDF outline for doc %s: %s", ctx.doc_id, e)
|