mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
### What problem does this PR solve? Closes #14773. Today, Pipeline (`rag/flow/`) chunking strategies only run as part of a dataset ingestion that always embeds and indexes the result. There is no way to drive Pipeline-style chunking from an Agent workflow without paying that vectorization/persistence cost. This PR adds a single new Agent component, `PipelineChunker`, that: - Takes one or more file references (from `Begin` / `UserFillUp` uploads) as input. - Runs the existing `rag.app.*` chunking strategies (`naive`, `paper`, `qa`, `manual`, `book`, `presentation`, `laws`, `table`, `one`, `email`, `picture`, `audio`, `resume`, `tag`) against each file. - Emits the resulting chunks as `chunks: list[str]` and `chunks_full: list[dict]` for downstream Agent nodes. - Performs **no embedding and no persistence** — chunks live only in canvas variables for the duration of the run, exactly as requested in the issue. The component is auto-discovered by `agent/component/__init__.py`; no registry edits required. Chunker functions are imported lazily so the component itself does not pull `deepdoc` / OCR / VLM at component-discovery time. File resolution mirrors the existing `ExcelProcessor` convention. Out of scope for this PR (potential follow-ups): - Vectorization / KB persistence (explicit ask in the issue). - Frontend canvas UI for the new component. - Bridging to the newer Pydantic-based `rag/flow/chunker/TokenChunker` (consumes a parser node's structured output rather than a raw file — a separate, larger feature). ### Type of change - [ ] Bug Fix (non-breaking change which fixes an issue) - [x] New Feature (non-breaking change which adds functionality) - [ ] Documentation Update - [ ] Refactoring - [ ] Performance Improvement - [ ] Other (please describe): --- ## Files changed - `agent/component/pipeline_chunker.py` — new component (~180 lines) - `test/unit_test/agent/test_pipeline_chunker.py` — unit tests (~120 lines) ## Test plan - [x] `ruff check` on changed files — clean. - [x] `ruff format` applied to the new component file. - [x] `python -m py_compile` on both new files — both compile. - [x] New unit test file carries `pytestmark = pytest.mark.p2` so it runs under marker-filtered CI. - [x] Every new function, method, and class has a docstring (CodeRabbit 80% docstring-coverage gate). - [x] `python -m pytest test/unit_test/agent/test_pipeline_chunker.py -x -q` — **7 passed in 1.95s** locally. Tests stub `api.db.services.file_service` and `rag.app.*` so they exercise the parameter validation and parser-id lookup table without requiring the full backend / model stack. ## Manual integration plan (post-merge) 1. Drop the component into an Agent canvas after a `Begin` node with a file input. 2. Set `parser_id = "naive"` (or any other strategy) and reference the file input in `inputs`. 3. Wire the `chunks` output into a downstream `LLM` / `Message` / `Iteration` node — chunks are available as plain text without any embedding or KB write. Co-authored-by: John Baillie <johnbaillie2007@gmail.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Zhichang Yu <yuzhichang@gmail.com>
195 lines
7.4 KiB
Python
195 lines
7.4 KiB
Python
#
|
|
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
"""
|
|
PipelineChunker Component
|
|
|
|
Run RAGFlow Pipeline-style chunkers (rag.app.*) against uploaded files inside an
|
|
Agent workflow. Emits plain text chunks for downstream Agent nodes — no
|
|
embedding, no persistence. Wraps existing chunker functions; does not
|
|
re-implement chunking logic.
|
|
"""
|
|
|
|
import importlib
|
|
import logging
|
|
import os
|
|
from abc import ABC
|
|
|
|
from agent.component.base import ComponentBase, ComponentParamBase
|
|
from api.db.services.file_service import FileService
|
|
from common.connection_utils import timeout
|
|
|
|
|
|
# Parser id -> dotted module path under rag.app. Imported lazily so we don't
|
|
# pull deepdoc/OCR/VLM machinery at component-discovery time.
|
|
_PARSER_MODULES: dict[str, str] = {
|
|
"general": "rag.app.naive",
|
|
"naive": "rag.app.naive",
|
|
"paper": "rag.app.paper",
|
|
"book": "rag.app.book",
|
|
"presentation": "rag.app.presentation",
|
|
"manual": "rag.app.manual",
|
|
"laws": "rag.app.laws",
|
|
"qa": "rag.app.qa",
|
|
"table": "rag.app.table",
|
|
"resume": "rag.app.resume",
|
|
"picture": "rag.app.picture",
|
|
"one": "rag.app.one",
|
|
"audio": "rag.app.audio",
|
|
"email": "rag.app.email",
|
|
"tag": "rag.app.tag",
|
|
}
|
|
|
|
|
|
def _load_chunker(parser_id: str):
|
|
"""Resolve a parser id to the underlying ``rag.app.<module>.chunk`` callable."""
|
|
module_path = _PARSER_MODULES[parser_id.lower()]
|
|
return importlib.import_module(module_path).chunk
|
|
|
|
|
|
class PipelineChunkerParam(ComponentParamBase):
|
|
"""
|
|
Define the PipelineChunker component parameters.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialise PipelineChunker defaults and declare component outputs."""
|
|
super().__init__()
|
|
self.inputs = [] # variable references to uploaded files
|
|
self.parser_id = "naive"
|
|
self.lang = "English"
|
|
self.from_page = 0
|
|
self.to_page = 100000000
|
|
self.parser_config = {}
|
|
|
|
self.outputs = {
|
|
"chunks": {"type": "list", "value": []},
|
|
"chunks_full": {"type": "list", "value": []},
|
|
"summary": {"type": "str", "value": ""},
|
|
}
|
|
|
|
def check(self):
|
|
"""Validate parser id, page range, and parser_config shape."""
|
|
self.check_valid_value(
|
|
self.parser_id.lower(),
|
|
"[PipelineChunker] parser_id",
|
|
list(_PARSER_MODULES.keys()),
|
|
)
|
|
self.check_nonnegative_number(self.from_page, "[PipelineChunker] from_page")
|
|
self.check_nonnegative_number(self.to_page, "[PipelineChunker] to_page")
|
|
if isinstance(self.from_page, (int, float)) and isinstance(self.to_page, (int, float)) and self.from_page > self.to_page:
|
|
raise ValueError("[PipelineChunker] from_page must be <= to_page")
|
|
if not isinstance(self.parser_config, dict):
|
|
raise ValueError("[PipelineChunker] parser_config must be a dict.")
|
|
return True
|
|
|
|
|
|
class PipelineChunker(ComponentBase, ABC):
|
|
"""
|
|
Run a Pipeline-style chunker (naive, paper, qa, manual, book, ...) against
|
|
one or more uploaded files and surface the resulting chunks to downstream
|
|
Agent nodes.
|
|
"""
|
|
|
|
component_name = "PipelineChunker"
|
|
|
|
def get_input_form(self) -> dict[str, dict]:
|
|
"""Expose each referenced file input as a file-typed form element."""
|
|
res = {}
|
|
for ref in self._param.inputs or []:
|
|
for k, o in self.get_input_elements_from_text(ref).items():
|
|
res[k] = {"name": o.get("name", ""), "type": "file"}
|
|
return res
|
|
|
|
def _get_file_content(self, file_ref: str) -> tuple[bytes | None, str | None]:
|
|
"""Resolve a canvas variable reference to ``(content_bytes, filename)``."""
|
|
value = self._canvas.get_variable_value(file_ref)
|
|
if value is None:
|
|
return None, None
|
|
|
|
if isinstance(value, list) and value:
|
|
value = value[0]
|
|
|
|
if isinstance(value, dict):
|
|
file_id = value.get("id") or value.get("file_id")
|
|
created_by = value.get("created_by") or self._canvas.get_tenant_id()
|
|
filename = value.get("name") or value.get("filename") or "uploaded"
|
|
if file_id:
|
|
try:
|
|
return FileService.get_blob(created_by, file_id), filename
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"[PipelineChunker] FileService.get_blob failed for "
|
|
f"file_id={file_id} created_by={created_by} filename={filename}: {e}"
|
|
)
|
|
return None, None
|
|
return None, None
|
|
|
|
@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 10 * 60)))
|
|
def _invoke(self, **kwargs):
|
|
"""Run the configured chunker over every referenced file and publish outputs."""
|
|
if self.check_if_canceled("PipelineChunker processing"):
|
|
return
|
|
|
|
chunker = _load_chunker(self._param.parser_id)
|
|
tenant_id = self._canvas.get_tenant_id()
|
|
chunk_kwargs = dict(
|
|
lang=self._param.lang,
|
|
tenant_id=tenant_id,
|
|
from_page=self._param.from_page,
|
|
to_page=self._param.to_page,
|
|
parser_config=self._param.parser_config or {},
|
|
callback=lambda prog=0, msg="": logging.info(f"[PipelineChunker] {prog}: {msg}"),
|
|
)
|
|
|
|
all_chunks: list[dict] = []
|
|
per_file_counts: list[str] = []
|
|
|
|
for file_ref in self._param.inputs or []:
|
|
if self.check_if_canceled("PipelineChunker processing"):
|
|
return
|
|
|
|
content, filename = self._get_file_content(file_ref)
|
|
self.set_input_value(file_ref, filename or "")
|
|
if content is None:
|
|
logging.warning(f"[PipelineChunker] could not resolve file ref: {file_ref}")
|
|
per_file_counts.append(f"{filename or file_ref}: error (could not resolve file)")
|
|
continue
|
|
|
|
try:
|
|
file_chunks = chunker(filename, binary=content, **chunk_kwargs) or []
|
|
except Exception as e:
|
|
logging.exception(e)
|
|
per_file_counts.append(f"{filename}: error (chunking failed)")
|
|
continue
|
|
|
|
all_chunks.extend(file_chunks)
|
|
per_file_counts.append(f"{filename}: {len(file_chunks)} chunks")
|
|
|
|
text_only = [(c.get("content_with_weight") or c.get("text") or "") for c in all_chunks if isinstance(c, dict)]
|
|
text_only = [t for t in text_only if t]
|
|
|
|
self.set_output("chunks", text_only)
|
|
self.set_output("chunks_full", all_chunks)
|
|
self.set_output(
|
|
"summary",
|
|
f"Parser: {self._param.parser_id} | Files: {len(self._param.inputs or [])} | Chunks: {len(text_only)}" + (" | " + "; ".join(per_file_counts) if per_file_counts else ""),
|
|
)
|
|
|
|
def thoughts(self) -> str:
|
|
"""Return a short status line for UI display."""
|
|
return f"Chunking with `{self._param.parser_id}` strategy..."
|