Files
ragflow/rag/svr/task_executor_refactor/chunk_service.py
Jack b363146997 refactor: overhaul task executor with layered architecture and comprehensive test suite (#15471)
## Summary

Decomposes the monolithic `task_executor.py` (1945 lines) into a 6-layer
architecture with clear separation of concerns. The refactored code is
functionally equivalent to the original, verified through 400 passing
tests and a production-vs-dry-run comparison framework.

## Architecture

```
entry (task_manager)
  └─ orchestration (task_handler)
       ├─ services (chunk_service, embedding_service, dataflow_service, raptor_service, post_processor)
       │    └─ utilities (chunk_builder, chunk_post_processor, embedding_utils)
       └─ infrastructure (task_context, recording_context, interceptor)
```

Key design decisions:
- **TaskContext** — typed facade over raw task dict, injects rate
limiters + callbacks via composition
- **RecordingContext + Comparator** — enables side-by-side production vs
dry-run execution for safe migration
- **NullRecordingContext** — zero-allocation no-op for production, uses
`__slots__`
- **WriteOperationInterceptor** — FIFO replay of previous runs function
returns for comparison mode

## Migration Strategy

The original `handle_task()` in `task_executor.py` uses a 3-way switch
via `TE_RUN_MODE`:
- `TE_RUN_MODE=0` (default) → runs refactored code
- `TE_RUN_MODE=1` → runs both original + refactored, compares all
intermediate results
- `TE_RUN_MODE=2` → runs original code (fallback)

The comparison mode (`TE_RUN_MODE=1`) records ~40 intermediate values
(chunks, vectors, token counts, func return values) from the production
run and replays them during dry-run, then uses `ContextComparator` to
report mismatches.

## Functional Equivalence Fixes

All divergences between original and refactored code were identified and
fixed:
- Timeout decorators (handle/build_chunks/raptor/embedding)
- NullRecordingContext leak in finally block causing RuntimeError
- MinIO None-binary check with proper FileNotFoundError
- Dataflow dispatch after embedding binding + init_kb
- Memory task missing return after processing
- RAPTOR checkpoint progress reporting
- Tag cache (get_tags_from_cache/set_tags_to_cache) restoration
- dataflow_id correction in _load_dsl
- Language default Chinese, dead code guard removal
- embed_chunks made async with proper thread_pool_exec
- Full GraphRAG default configuration (10 parameters)
- Hardcoded q_768_vec fallback removal in RAPTOR

## Test Changes

- 20 new tests covering table parser manual mode, tag cache, embedding
edge cases, RAPTOR checkpoint, dataflow_id correction, storage binary
None, cancel cleanup, metadata=None boundary
- Unified `make_task_context`/`make_task_dict` factories eliminated 10+
duplicated helpers
- DataflowService tests migrated from internal method mocks to IO
boundary mocks (real orchestration code executes)
- Parametrized duplicate build_chunks post-processor tests
- 7 raptor tests modernized to @pytest.mark.asyncio
- Mock count per test reduced through boundary-level mocking strategy

**Test count: 400 passing, 0 warnings, 0 skips**

## Files Changed

| File | Change |
|------|--------|
| `rag/svr/task_executor.py` | +1 line (NullRecordingContext fix) |
| `rag/svr/task_executor_refactor/task_handler.py` | Orchestration
layer, 8 logic fixes |
| `rag/svr/task_executor_refactor/chunk_service.py` | +timeout +
None-check |
| `rag/svr/task_executor_refactor/embedding_service.py` | sync→async
rewrite |
| `rag/svr/task_executor_refactor/dataflow_service.py` | dataflow_id fix
+ timeout |
| `rag/svr/task_executor_refactor/raptor_service.py` | checkpoint fix +
assert |
| `rag/svr/task_executor_refactor/chunk_post_processor.py` | tag cache
restore |
| `rag/svr/task_executor_refactor/task_context.py` | language default
fix |
| `test/.../conftest.py` | +294 lines shared helpers |
| `test/.../*.py` | 15 test files refactored, 20 new tests |

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 17:18:31 +08:00

483 lines
18 KiB
Python

#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Chunk Service Module.
Provides [`ChunkService`](rag/svr/task_executor_refactor/chunk_service.py:50) for document chunking,
post-processing (keywords, questions, metadata, tags), MinIO upload, and chunk insertion into document store.
This module orchestrates the chunk building pipeline by delegating to:
- [`chunk_builder`](rag/svr/task_executor_refactor/chunk_builder.py): Parser selection and document chunking
- [`chunk_post_processor`](rag/svr/task_executor_refactor/chunk_post_processor.py): Post-processing functions
"""
import asyncio
import copy
import logging
from datetime import datetime
from functools import partial
from timeit import default_timer as timer
from typing import Any, Dict, List
import xxhash
from common import settings
from common.connection_utils import timeout
from common.constants import PAGERANK_FLD, TAG_FLD
from common.misc_utils import thread_pool_exec
from common.float_utils import normalize_overlapped_percent
from rag.nlp import search
from rag.svr.task_executor_refactor.task_context import TaskContext
from rag.utils.base64_image import image2id
from api.db.services.task_service import TaskService
from rag.svr.task_executor_refactor.constants import GRAPH_RAPTOR_FAKE_DOC_ID
# Re-export for backward compatibility
from rag.svr.task_executor_refactor.chunk_builder import (
get_parser,
run_chunking,
extract_outline,
)
from rag.svr.task_executor_refactor.chunk_post_processor import (
extract_keywords,
generate_questions,
generate_metadata,
apply_tags,
)
class ChunkService:
"""Service for document chunking and post-processing.
This service handles:
- Document chunking via parser modules (delegated to chunk_builder)
- MinIO upload of chunk images
- Keyword extraction (delegated to chunk_post_processor)
- Question generation (delegated to chunk_post_processor)
- Metadata generation (delegated to chunk_post_processor)
- Content tagging (delegated to chunk_post_processor)
- Table of contents generation
- Chunk insertion into document store
All intermediate results are recorded via RecordingContext for comparison.
"""
def __init__(
self,
ctx: TaskContext,
):
"""Initialize ChunkService.
Args:
ctx: TaskContext containing task configuration and execution resources.
"""
self._task_context = ctx
@timeout(60 * 80, 1)
async def build_chunks(
self,
storage_binary: bytes,
) -> List[Dict[str, Any]]:
"""Build chunks from document binary.
This is the main entry point for chunk building. It orchestrates:
1. File size validation
2. Parser selection and chunking (delegated to chunk_builder)
3. Outline extraction (delegated to chunk_builder)
4. MinIO upload
5. Post-processing (delegated to chunk_post_processor)
Args:
storage_binary: Binary content of the document.
Returns:
List of chunk dictionaries ready for embedding.
"""
ctx = self._task_context
# Validate file size
if ctx.size > settings.DOC_MAXIMUM_SIZE:
self._progress(prog=-1, msg="File size exceeds( <= %dMb )" %
(int(settings.DOC_MAXIMUM_SIZE / 1024 / 1024)))
self._task_context.recording_context.record("file_size_exceeded", True)
return []
ctx.recording_context.record("file_size_exceeded", False)
ctx.recording_context.record("parser_id", ctx.parser_id)
# Get parser
chunker = get_parser(ctx.parser_id)
# record config for compare
chunk_config = {
"parser_id": ctx.parser_id,
"chunk_token_num": ctx.parser_config.get("chunk_token_num", 128),
"overlapped_percent": normalize_overlapped_percent(
ctx.parser_config.get("overlapped_percent", 0)
),
"delimiter": ctx.parser_config.get("delimiter", "\n!?。;!?"),
"from_page": ctx.from_page,
"to_page": ctx.to_page,
"language": ctx.language,
"layout_recognizer": ctx.parser_config.get("layout_recognizer"),
}
ctx.recording_context.record("chunk_config", chunk_config)
# Run chunking (delegated)
cks = await run_chunking(chunker, storage_binary, ctx)
# Record raw chunks
self._task_context.recording_context.record("raw_chunks", cks)
# Extract outline (delegated)
await extract_outline(cks, ctx)
# Prepare docs and upload to MinIO
docs = await self._prepare_docs_and_upload(cks)
# Record docs after prep
self._task_context.recording_context.record("docs_after_prep", docs)
# Post-processing (delegated to chunk_post_processor)
if ctx.parser_config.get("auto_keywords", 0):
await extract_keywords(docs, ctx)
keywords = [d for d in docs if d.get("important_kwd")]
self._task_context.recording_context.record("keywords_extracted", keywords)
if ctx.parser_config.get("auto_questions", 0):
await generate_questions(docs, ctx)
questions = [d for d in docs if d.get("question_kwd")]
self._task_context.recording_context.record("questions_generated", questions)
if ctx.parser_config.get("enable_metadata", False) and (
ctx.parser_config.get("metadata") or ctx.parser_config.get("built_in_metadata")
):
await generate_metadata(docs, ctx)
metadata_list = [d for d in docs if d.get("metadata_obj")]
self._task_context.recording_context.record("metadata_list_generated", metadata_list)
if ctx.kb_parser_config.get("tag_kb_ids", []):
await apply_tags(docs, ctx)
tags_applied = [d for d in docs if d.get(TAG_FLD)]
self._task_context.recording_context.record("tags_applied", tags_applied)
# Record final chunks
self._task_context.recording_context.record("final_chunks", docs)
final_chunk_ids = [c.get("id") for c in docs if isinstance(c, dict) and "id" in c]
self._task_context.recording_context.record("final_chunk_ids_count", len(final_chunk_ids))
return docs
async def _prepare_docs_and_upload(self, cks: List[Dict]) -> List[Dict]:
"""Prepare docs and upload images to MinIO."""
ctx = self._task_context
docs = []
doc = {
"doc_id": ctx.doc_id,
"kb_id": str(ctx.kb_id)
}
if ctx.pagerank:
doc[PAGERANK_FLD] = int(ctx.pagerank)
st = timer()
@timeout(60)
async def upload_to_minio(document, chunk):
try:
d = copy.deepcopy(document)
d.update(chunk)
d["id"] = xxhash.xxh64(
(chunk["content_with_weight"] + str(d["doc_id"])).encode("utf-8", "surrogatepass")).hexdigest()
d["create_time"] = str(datetime.now()).replace("T", " ")[:19]
d["create_timestamp_flt"] = datetime.now().timestamp()
if d.get("img_id"):
docs.append(d)
return
if not d.get("image"):
_ = d.pop("image", None)
d["img_id"] = ""
docs.append(d)
return
await image2id(d, partial(settings.STORAGE_IMPL.put, tenant_id=ctx.tenant_id), d["id"], ctx.kb_id)
docs.append(d)
except Exception:
logging.exception(
"Saving image of chunk {}/{}/{} got exception".format(ctx.location, ctx.name, d["id"]))
raise
tasks = []
for ck in cks:
tasks.append(asyncio.create_task(upload_to_minio(doc, ck)))
try:
await asyncio.gather(*tasks, return_exceptions=False)
except Exception as e:
logging.error(f"MINIO PUT({ctx.name}) got exception: {e}")
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise
el = timer() - st
logging.info("MINIO PUT({}) cost {:.3f} s".format(ctx.name, el))
return docs
def _progress(self, prog=None, msg=None):
"""Progress callback helper."""
if prog is not None or msg is not None:
self._task_context.progress_cb(prog=prog, msg=msg)
# =========================================================================
# Insert Service Methods (merged from insert_service.py)
# =========================================================================
async def insert_chunks(
self,
task_id: str,
task_tenant_id: str,
task_dataset_id: str,
chunks: List[Dict[str, Any]],
doc_bulk_size: int = None,
) -> bool:
"""Insert chunks into document store.
Args:
task_id: Task identifier.
task_tenant_id: Tenant ID.
task_dataset_id: Dataset/knowledge base ID.
chunks: List of chunk dictionaries to insert.
doc_bulk_size: Batch size for document store inserts.
Returns:
True if all chunks were inserted successfully, False otherwise.
"""
doc_bulk_size = doc_bulk_size or settings.DOC_BULK_SIZE
# Create mother chunks (summary chunks)
mothers = self._create_mother_chunks(chunks)
# Insert mother chunks
if not await self._insert_mother_chunks(task_id, task_tenant_id, task_dataset_id, mothers, doc_bulk_size):
return False
# Insert main chunks
return await self._insert_main_chunks(task_id, task_tenant_id, task_dataset_id, chunks, doc_bulk_size)
@classmethod
def _create_mother_chunks(cls, chunks: List[Dict]) -> List[Dict]:
"""Create mother chunks from summary fields.
Mother chunks are summary/abstract chunks that are stored separately.
"""
mothers = []
mother_ids = set()
for ck in chunks:
mom = ck.get("mom") or ck.get("mom_with_weight") or ""
if not mom:
continue
mom_id = xxhash.xxh64(mom.encode("utf-8")).hexdigest()
ck["mom_id"] = mom_id
if mom_id in mother_ids:
continue
mother_ids.add(mom_id)
mom_ck = copy.deepcopy(ck)
mom_ck["id"] = mom_id
mom_ck["content_with_weight"] = mom
mom_ck["available_int"] = 0
# Keep only essential fields
allowed_fields = [
"id", "content_with_weight", "doc_id", "docnm_kwd",
"kb_id", "available_int", "position_int",
"create_timestamp_flt", "page_num_int", "top_int"
]
for fld in list(mom_ck.keys()):
if fld not in allowed_fields:
del mom_ck[fld]
mothers.append(mom_ck)
return mothers
async def _insert_mother_chunks(
self,
task_id: str,
task_tenant_id: str,
task_dataset_id: str,
mothers: List[Dict],
doc_bulk_size: int,
) -> bool:
"""Insert mother chunks in batches."""
for b in range(0, len(mothers), doc_bulk_size):
await self._intercept_doc_store_insert(
mothers[b:b + doc_bulk_size],
search.index_name(task_tenant_id),
task_dataset_id
)
if self._task_context.has_canceled_func(task_id):
self._task_context.progress_cb(-1, msg="Task has been canceled.")
return False
return True
async def _intercept_doc_store_delete(self, condition: dict, index_name: str, task_dataset_id: str) -> Any:
if self._task_context.write_interceptor:
return self._task_context.write_interceptor.intercept("docStoreConn.delete")
else:
return await thread_pool_exec(settings.docStoreConn.delete, condition, index_name, task_dataset_id)
async def _intercept_doc_store_insert(self, chunks: list, index_name: str, task_dataset_id: str) -> Any:
if self._task_context.write_interceptor:
if self._task_context.doc_id == GRAPH_RAPTOR_FAKE_DOC_ID: # raptor - non-determinisic
return self._task_context.write_interceptor.intercept("docStoreConn.insert", [])
return self._task_context.write_interceptor.intercept("docStoreConn.insert")
else:
return await thread_pool_exec(settings.docStoreConn.insert, chunks, index_name, task_dataset_id)
async def _insert_main_chunks(
self,
task_id: str,
task_tenant_id: str,
task_dataset_id: str,
chunks: List[Dict],
doc_bulk_size: int,
) -> bool:
"""Insert main chunks in batches with cancellation handling."""
for b in range(0, len(chunks), doc_bulk_size):
doc_store_result = await self._intercept_doc_store_insert(
chunks[b:b + doc_bulk_size],
search.index_name(task_tenant_id),
task_dataset_id
)
if self._task_context.has_canceled_func(task_id):
# Roll back partial RAPTOR summary inserts
await self._rollback_raptor_chunks(
task_id, task_tenant_id, task_dataset_id, chunks, b, doc_bulk_size
)
self._task_context.progress_cb(-1, msg="Task has been canceled.")
return False
if b % 128 == 0:
self._task_context.progress_cb(prog=0.8 + 0.1 * (b + 1) / len(chunks),msg="")
if doc_store_result:
error_message = (
f"Insert chunk error: {doc_store_result}, "
"please check log file and Elasticsearch/Infinity status!"
)
self._task_context.progress_cb(-1, msg=error_message)
raise Exception(error_message)
# Update chunk IDs in task
chunk_ids = [chunk["id"] for chunk in chunks[:b + doc_bulk_size]]
if not await self._update_task_chunk_ids(task_id, chunk_ids):
# Roll back on failure
await self._rollback_insertion(task_tenant_id, task_dataset_id, chunk_ids)
self._task_context.progress_cb(
-1,
msg=f"Chunk updates failed since task {task_id} is unknown."
)
return False
return True
async def _rollback_raptor_chunks(
self,
task_id: str,
task_tenant_id: str,
task_dataset_id: str,
chunks: List[Dict],
up_to_batch: int,
doc_bulk_size: int,
):
"""Roll back partial RAPTOR summary inserts after cancellation."""
raptor_ids = [
c["id"] for c in chunks[:up_to_batch + doc_bulk_size]
if c.get("raptor_kwd") == "raptor"
]
if raptor_ids:
try:
await self._intercept_doc_store_delete(
{"id": raptor_ids}, search.index_name(task_tenant_id), task_dataset_id
)
logging.info(
"insert_chunks: rolled back %d partial RAPTOR chunks after cancellation (task=%s)",
len(raptor_ids), task_id,
)
except Exception:
logging.exception(
"insert_chunks: failed to roll back partial RAPTOR chunks after cancellation (task=%s)",
task_id,
)
async def _update_task_chunk_ids(self, task_id: str, chunk_ids: List[str]) -> bool:
"""Update chunk IDs in the task record."""
from peewee import DoesNotExist
try:
if self._task_context.write_interceptor:
if self._task_context.doc_id == GRAPH_RAPTOR_FAKE_DOC_ID:
self._task_context.write_interceptor.intercept("TaskService.update_chunk_ids", True)
else:
self._task_context.write_interceptor.intercept("TaskService.update_chunk_ids")
else:
TaskService.update_chunk_ids(task_id, " ".join(chunk_ids))
return True
except DoesNotExist:
logging.warning(f"do_handle_task update_chunk_ids failed since task {task_id} is unknown.")
return False
async def _rollback_insertion(
self,
task_tenant_id: str,
task_dataset_id: str,
chunk_ids: List[str],
):
"""Roll back an insertion by deleting chunks and images."""
await self._intercept_doc_store_delete(
{"id": chunk_ids}, search.index_name(task_tenant_id), task_dataset_id
)
# Delete associated images
tasks = []
for chunk_id in chunk_ids:
tasks.append(asyncio.create_task(self._delete_image(task_dataset_id, chunk_id)))
try:
await asyncio.gather(*tasks, return_exceptions=False)
except Exception as e:
logging.error(f"delete_image failed: {e}")
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise
async def _delete_image(self, kb_id: str, chunk_id: str):
"""Delete a chunk's image from storage."""
try:
async with self._task_context.minio_limiter:
settings.STORAGE_IMPL.delete(kb_id, chunk_id)
except Exception:
logging.exception(f"Deleting image of chunk {chunk_id} got exception")
raise