Refactor: Task Executor (#15154)

### What problem does this PR solve?

1. Break huge function into smaller pieces
2. Add unit test for the smaller pieces function
3. Layer-ed design
a. infra layer - task_context.py, recording_context.py,
write_operation_interceptor.py, ...
    b. service layer - *_service.py
    c. business layer - task_handler.py
4. Default behavior: use "refactor-ed version" - can switch to original
version by change env variable

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Refactoring
- [x] Performance Improvement

---------

Co-authored-by: Liu An <asiro@qq.com>
Co-authored-by: Zhichang Yu <yuzhichang@gmail.com>
This commit is contained in:
Jack
2026-05-27 21:54:17 +08:00
committed by GitHub
parent 0071e98c11
commit f0cb7a544b
55 changed files with 12707 additions and 465 deletions

View File

@@ -210,7 +210,7 @@ function task_exe() {
JEMALLOC_PATH="$(pkg-config --variable=libdir jemalloc)/libjemalloc.so"
while true; do
LD_PRELOAD="$JEMALLOC_PATH" \
"$PY" rag/svr/task_executor.py "${host_id}_${consumer_id}" &
"$PY" rag/svr/task_executor.py -i "${host_id}_${consumer_id}" -t "common" &
wait;
sleep 1;
done

View File

@@ -73,7 +73,7 @@ task_exe(){
local retry_count=0
while ! $STOP && [ $retry_count -lt $MAX_RETRIES ]; do
echo "Starting task_executor.py for task $task_id (Attempt $((retry_count+1)))"
LD_PRELOAD=$JEMALLOC_PATH $PY rag/svr/task_executor.py "$task_id"
LD_PRELOAD=$JEMALLOC_PATH $PY rag/svr/task_executor.py -i "$task_id"
EXIT_CODE=$?
if [ $EXIT_CODE -eq 0 ]; then
echo "task_executor.py for task $task_id exited successfully."