diff --git a/api/db/services/connector_service.py b/api/db/services/connector_service.py index 9fa868c603..8d2b015010 100644 --- a/api/db/services/connector_service.py +++ b/api/db/services/connector_service.py @@ -171,7 +171,7 @@ class ConnectorService(CommonService): return 0, [] source_type = f"{conn.source}/{conn.id}" - retain_doc_ids = {hash128(f"{connector_id}:{file.id}") for file in file_list} + retain_doc_ids = {doc_id for file in file_list for doc_id in (hash128(f"{connector_id}:{file.id}"), hash128(f"{kb_id}:{connector_id}:{file.id}"))} existing_docs = DocumentService.list_doc_headers_by_kb_and_source_type( kb_id, source_type, diff --git a/docker/launch_backend_service.sh b/docker/launch_backend_service.sh index ac2ec5e4ab..e5b9411f58 100755 --- a/docker/launch_backend_service.sh +++ b/docker/launch_backend_service.sh @@ -14,7 +14,7 @@ load_env_file() { echo "Loading environment variables from: $env_file" # Source the .env file set -a - source "$env_file" + source "$env_file" set +a else echo "Warning: .env file not found at: $env_file" diff --git a/docker/launch_data_sync_service.sh b/docker/launch_data_sync_service.sh new file mode 100755 index 0000000000..fa4f3cf91c --- /dev/null +++ b/docker/launch_data_sync_service.sh @@ -0,0 +1,98 @@ +#!/bin/bash + +# Exit immediately if a command exits with a non-zero status +set -e + +# Function to load environment variables from .env file +load_env_file() { + # Get the directory of the current script + local script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + local env_file="$script_dir/.env" + + # Check if .env file exists + if [ -f "$env_file" ]; then + echo "Loading environment variables from: $env_file" + # Source the .env file + set -a + source "$env_file" + set +a + else + echo "Warning: .env file not found at: $env_file" + fi +} + +# Load environment variables +load_env_file + +# Unset HTTP proxies that might be set by Docker daemon +export http_proxy=""; export https_proxy=""; export no_proxy=""; export HTTP_PROXY=""; export HTTPS_PROXY=""; export NO_PROXY="" +export PYTHONPATH=$(pwd) + +export LD_LIBRARY_PATH=/usr/lib/x86_64-linux-gnu/ +JEMALLOC_PATH=$(pkg-config --variable=libdir jemalloc)/libjemalloc.so + +PY=python3 + +# Set default number of workers if WS is not set or less than 1 +if [[ -z "$WS" || $WS -lt 1 ]]; then + WS=1 +fi + +# Maximum number of retries for each task executor and server +MAX_RETRIES=5 + +# Flag to control termination +STOP=false + +# Array to keep track of child PIDs +PIDS=() + +# Set the path to the NLTK data directory +export NLTK_DATA="./nltk_data" + +# Function to handle termination signals +cleanup() { + echo "Termination signal received. Shutting down..." + STOP=true + # Terminate all child processes + for pid in "${PIDS[@]}"; do + if kill -0 "$pid" 2>/dev/null; then + echo "Killing process $pid" + kill "$pid" + fi + done + exit 0 +} + +# Trap SIGINT and SIGTERM to invoke cleanup +trap cleanup SIGINT SIGTERM + +# Function to execute sync_data_source with retry logic +run_server(){ + local retry_count=0 + while ! $STOP && [ $retry_count -lt $MAX_RETRIES ]; do + echo "Starting sync_data_source.py (Attempt $((retry_count+1)))" + $PY rag/svr/sync_data_source.py + EXIT_CODE=$? + if [ $EXIT_CODE -eq 0 ]; then + echo "sync_data_source.py exited successfully." + break + else + echo "sync_data_source.py failed with exit code $EXIT_CODE. Retrying..." >&2 + retry_count=$((retry_count + 1)) + sleep 2 + fi + done + + if [ $retry_count -ge $MAX_RETRIES ]; then + echo "sync_data_source.py failed after $MAX_RETRIES attempts. Exiting..." >&2 + cleanup + fi +} + +# Start the main server +run_server & +PIDS+=($!) + +# Wait for all background processes to finish +wait diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 23d62a5bf9..fcd682d772 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -232,8 +232,10 @@ class SyncBase: docs = [] for doc in document_batch: + legacy_doc_id = hash128(f"{task['connector_id']}:{doc.id}") + new_doc_id = hash128(f"{task['kb_id']}:{task['connector_id']}:{doc.id}") d = { - "id": hash128(f"{task['connector_id']}:{doc.id}"), + "id": legacy_doc_id if legacy_doc_id in existing_doc_ids else new_doc_id, "connector_id": task["connector_id"], "source": self.SOURCE_NAME, "semantic_identifier": doc.semantic_identifier, @@ -401,8 +403,9 @@ class _BlobLikeBase(SyncBase): if key_record.deleted: continue - doc_id = hash128(key_record.key) - stored = existing_fingerprints.get(doc_id, "") + legacy_doc_id = hash128(f"{task['connector_id']}:{key_record.key}") + new_doc_id = hash128(f"{task['kb_id']}:{task['connector_id']}:{key_record.key}") + stored = existing_fingerprints.get(legacy_doc_id, "") or existing_fingerprints.get(new_doc_id, "") if key_record.fingerprint and stored and key_record.fingerprint == stored: bypass_count += 1 continue