Fix one data source can be synced to multiple dataset (#16023)

Fix one data source can be synced to multiple dataset
Test add/delete - worked.
This commit is contained in:
Wang Qi
2026-06-15 16:54:25 +08:00
committed by GitHub
parent fa6d29603a
commit f6a2075ad0
4 changed files with 106 additions and 5 deletions

View File

@@ -171,7 +171,7 @@ class ConnectorService(CommonService):
return 0, []
source_type = f"{conn.source}/{conn.id}"
retain_doc_ids = {hash128(f"{connector_id}:{file.id}") for file in file_list}
retain_doc_ids = {doc_id for file in file_list for doc_id in (hash128(f"{connector_id}:{file.id}"), hash128(f"{kb_id}:{connector_id}:{file.id}"))}
existing_docs = DocumentService.list_doc_headers_by_kb_and_source_type(
kb_id,
source_type,

View File

@@ -14,7 +14,7 @@ load_env_file() {
echo "Loading environment variables from: $env_file"
# Source the .env file
set -a
source "$env_file"
source "$env_file"
set +a
else
echo "Warning: .env file not found at: $env_file"

View File

@@ -0,0 +1,98 @@
#!/bin/bash
# Exit immediately if a command exits with a non-zero status
set -e
# Function to load environment variables from .env file
load_env_file() {
# Get the directory of the current script
local script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
local env_file="$script_dir/.env"
# Check if .env file exists
if [ -f "$env_file" ]; then
echo "Loading environment variables from: $env_file"
# Source the .env file
set -a
source "$env_file"
set +a
else
echo "Warning: .env file not found at: $env_file"
fi
}
# Load environment variables
load_env_file
# Unset HTTP proxies that might be set by Docker daemon
export http_proxy=""; export https_proxy=""; export no_proxy=""; export HTTP_PROXY=""; export HTTPS_PROXY=""; export NO_PROXY=""
export PYTHONPATH=$(pwd)
export LD_LIBRARY_PATH=/usr/lib/x86_64-linux-gnu/
JEMALLOC_PATH=$(pkg-config --variable=libdir jemalloc)/libjemalloc.so
PY=python3
# Set default number of workers if WS is not set or less than 1
if [[ -z "$WS" || $WS -lt 1 ]]; then
WS=1
fi
# Maximum number of retries for each task executor and server
MAX_RETRIES=5
# Flag to control termination
STOP=false
# Array to keep track of child PIDs
PIDS=()
# Set the path to the NLTK data directory
export NLTK_DATA="./nltk_data"
# Function to handle termination signals
cleanup() {
echo "Termination signal received. Shutting down..."
STOP=true
# Terminate all child processes
for pid in "${PIDS[@]}"; do
if kill -0 "$pid" 2>/dev/null; then
echo "Killing process $pid"
kill "$pid"
fi
done
exit 0
}
# Trap SIGINT and SIGTERM to invoke cleanup
trap cleanup SIGINT SIGTERM
# Function to execute sync_data_source with retry logic
run_server(){
local retry_count=0
while ! $STOP && [ $retry_count -lt $MAX_RETRIES ]; do
echo "Starting sync_data_source.py (Attempt $((retry_count+1)))"
$PY rag/svr/sync_data_source.py
EXIT_CODE=$?
if [ $EXIT_CODE -eq 0 ]; then
echo "sync_data_source.py exited successfully."
break
else
echo "sync_data_source.py failed with exit code $EXIT_CODE. Retrying..." >&2
retry_count=$((retry_count + 1))
sleep 2
fi
done
if [ $retry_count -ge $MAX_RETRIES ]; then
echo "sync_data_source.py failed after $MAX_RETRIES attempts. Exiting..." >&2
cleanup
fi
}
# Start the main server
run_server &
PIDS+=($!)
# Wait for all background processes to finish
wait

View File

@@ -232,8 +232,10 @@ class SyncBase:
docs = []
for doc in document_batch:
legacy_doc_id = hash128(f"{task['connector_id']}:{doc.id}")
new_doc_id = hash128(f"{task['kb_id']}:{task['connector_id']}:{doc.id}")
d = {
"id": hash128(f"{task['connector_id']}:{doc.id}"),
"id": legacy_doc_id if legacy_doc_id in existing_doc_ids else new_doc_id,
"connector_id": task["connector_id"],
"source": self.SOURCE_NAME,
"semantic_identifier": doc.semantic_identifier,
@@ -401,8 +403,9 @@ class _BlobLikeBase(SyncBase):
if key_record.deleted:
continue
doc_id = hash128(key_record.key)
stored = existing_fingerprints.get(doc_id, "")
legacy_doc_id = hash128(f"{task['connector_id']}:{key_record.key}")
new_doc_id = hash128(f"{task['kb_id']}:{task['connector_id']}:{key_record.key}")
stored = existing_fingerprints.get(legacy_doc_id, "") or existing_fingerprints.get(new_doc_id, "")
if key_record.fingerprint and stored and key_record.fingerprint == stored:
bypass_count += 1
continue