Files
ragflow/rag/svr/task_executor_refactor/embedding_service.py
Jack b363146997 refactor: overhaul task executor with layered architecture and comprehensive test suite (#15471)
## Summary

Decomposes the monolithic `task_executor.py` (1945 lines) into a 6-layer
architecture with clear separation of concerns. The refactored code is
functionally equivalent to the original, verified through 400 passing
tests and a production-vs-dry-run comparison framework.

## Architecture

```
entry (task_manager)
  └─ orchestration (task_handler)
       ├─ services (chunk_service, embedding_service, dataflow_service, raptor_service, post_processor)
       │    └─ utilities (chunk_builder, chunk_post_processor, embedding_utils)
       └─ infrastructure (task_context, recording_context, interceptor)
```

Key design decisions:
- **TaskContext** — typed facade over raw task dict, injects rate
limiters + callbacks via composition
- **RecordingContext + Comparator** — enables side-by-side production vs
dry-run execution for safe migration
- **NullRecordingContext** — zero-allocation no-op for production, uses
`__slots__`
- **WriteOperationInterceptor** — FIFO replay of previous runs function
returns for comparison mode

## Migration Strategy

The original `handle_task()` in `task_executor.py` uses a 3-way switch
via `TE_RUN_MODE`:
- `TE_RUN_MODE=0` (default) → runs refactored code
- `TE_RUN_MODE=1` → runs both original + refactored, compares all
intermediate results
- `TE_RUN_MODE=2` → runs original code (fallback)

The comparison mode (`TE_RUN_MODE=1`) records ~40 intermediate values
(chunks, vectors, token counts, func return values) from the production
run and replays them during dry-run, then uses `ContextComparator` to
report mismatches.

## Functional Equivalence Fixes

All divergences between original and refactored code were identified and
fixed:
- Timeout decorators (handle/build_chunks/raptor/embedding)
- NullRecordingContext leak in finally block causing RuntimeError
- MinIO None-binary check with proper FileNotFoundError
- Dataflow dispatch after embedding binding + init_kb
- Memory task missing return after processing
- RAPTOR checkpoint progress reporting
- Tag cache (get_tags_from_cache/set_tags_to_cache) restoration
- dataflow_id correction in _load_dsl
- Language default Chinese, dead code guard removal
- embed_chunks made async with proper thread_pool_exec
- Full GraphRAG default configuration (10 parameters)
- Hardcoded q_768_vec fallback removal in RAPTOR

## Test Changes

- 20 new tests covering table parser manual mode, tag cache, embedding
edge cases, RAPTOR checkpoint, dataflow_id correction, storage binary
None, cancel cleanup, metadata=None boundary
- Unified `make_task_context`/`make_task_dict` factories eliminated 10+
duplicated helpers
- DataflowService tests migrated from internal method mocks to IO
boundary mocks (real orchestration code executes)
- Parametrized duplicate build_chunks post-processor tests
- 7 raptor tests modernized to @pytest.mark.asyncio
- Mock count per test reduced through boundary-level mocking strategy

**Test count: 400 passing, 0 warnings, 0 skips**

## Files Changed

| File | Change |
|------|--------|
| `rag/svr/task_executor.py` | +1 line (NullRecordingContext fix) |
| `rag/svr/task_executor_refactor/task_handler.py` | Orchestration
layer, 8 logic fixes |
| `rag/svr/task_executor_refactor/chunk_service.py` | +timeout +
None-check |
| `rag/svr/task_executor_refactor/embedding_service.py` | sync→async
rewrite |
| `rag/svr/task_executor_refactor/dataflow_service.py` | dataflow_id fix
+ timeout |
| `rag/svr/task_executor_refactor/raptor_service.py` | checkpoint fix +
assert |
| `rag/svr/task_executor_refactor/chunk_post_processor.py` | tag cache
restore |
| `rag/svr/task_executor_refactor/task_context.py` | language default
fix |
| `test/.../conftest.py` | +294 lines shared helpers |
| `test/.../*.py` | 15 test files refactored, 20 new tests |

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 17:18:31 +08:00

123 lines
4.4 KiB
Python

#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Embedding Service Module.
Provides [`EmbeddingService`](rag/svr/task_executor_refactor/embedding_service.py:42) for vector embedding operations.
"""
from typing import Any, Dict, List, Tuple
import numpy as np
from common import settings
from common.misc_utils import thread_pool_exec
from common.token_utils import truncate
from rag.svr.task_executor_refactor.embedding_utils import EmbeddingUtils
from rag.svr.task_executor_refactor.task_context import TaskContext
class EmbeddingService:
"""Service for vector embedding operations.
This service handles:
- Batch encoding of text chunks
- Title + content vector combination
- Embedding model rate limiting
All intermediate results are recorded via RecordingContext for comparison.
"""
def __init__(
self,
ctx: TaskContext,
embedding_batch_size: int = None,
):
"""Initialize EmbeddingService.
Args:
ctx: TaskContext containing task configuration and execution resources.
embedding_batch_size: Batch size for embedding operations.
"""
self._task_context = ctx
self._embedding_batch_size = embedding_batch_size or settings.EMBEDDING_BATCH_SIZE
async def embed_chunks(
self,
docs: List[Dict[str, Any]],
embedding_model,
parser_config: Dict = None,
) -> Tuple[int, int]:
"""Embed a list of chunks.
Args:
docs: List of chunk dictionaries to embed.
embedding_model: The embedding model bundle (LLMBundle).
parser_config: Parser configuration for filename embedding weight.
Returns:
Tuple of (token_count, vector_size).
"""
if parser_config is None:
parser_config = {}
# Prepare text for embedding using EmbeddingUtils
titles, contents = EmbeddingUtils.prepare_texts_for_embedding(docs)
# Encode titles using EmbeddingUtils for truncation
tk_count = 0
if len(titles) > 0 and len(titles) == len(contents):
async with self._task_context.embed_limiter:
vts, c = await thread_pool_exec(embedding_model.encode, titles[0:1])
tts = np.tile(vts[0], (len(contents), 1))
tk_count += c
else:
tts = None
# Batch encode contents using EmbeddingUtils
vects_batches = []
for i in range(0, len(contents), self._embedding_batch_size):
batch = contents[i: i + self._embedding_batch_size]
async with self._task_context.embed_limiter:
vts, c = await thread_pool_exec(
self._batch_encode_wrapper,
[truncate(t, embedding_model.max_length - 10) for t in batch],
embedding_model,
)
vects_batches.append(vts)
tk_count += c
if self._task_context.progress_cb:
self._task_context.progress_cb(prog=0.7 + 0.2 * (i + 1) / len(contents), msg="")
# Stack vectors using EmbeddingUtils
cnts = EmbeddingUtils.stack_vectors(vects_batches)
# Combine title and content vectors using EmbeddingUtils
title_weight = parser_config.get("filename_embd_weight", EmbeddingUtils.DEFAULT_TITLE_WEIGHT)
vects = EmbeddingUtils.combine_title_content_vectors(tts, cnts, title_weight)
assert len(vects) == len(docs)
# Attach vectors to docs using EmbeddingUtils
vector_size = EmbeddingUtils.attach_vectors(docs, vects)
return tk_count, vector_size
@staticmethod
def _batch_encode_wrapper(txts: List[str], embedding_model) -> Tuple[np.ndarray, int]:
"""Synchronous wrapper for batch encoding — used with thread_pool_exec."""
return embedding_model.encode(txts)