From 74fa54f1221ab7b9118d8b1dd4a33c1b376a153c Mon Sep 17 00:00:00 2001 From: Paras Sondhi Date: Wed, 29 Apr 2026 07:34:36 +0530 Subject: [PATCH] feat(google-drive): optimize memory payload and enable sync deletion (#14372) **Addresses the Google Drive integration for #14362** This PR completely overhauls the Google Drive sync logic to accurately detect remote deletions, while drastically reducing the memory footprint during the snapshot phase. ### What changed under the hood: * **Killed the memory bloat:** Swapped out the massive document dictionary objects for a lightweight `collections.namedtuple` (`SlimDoc = namedtuple('SlimDoc', ['id'])`). This prevents RAM spikes during `retrieve_all_slim_docs_perm_sync` on massive enterprise drives. * **Flawless downstream integration:** The `SlimDoc` object relies on simple duck typing. It perfectly delivers the `.id` attribute required by `ConnectorService.cleanup_stale_documents_for_task`, meaning your core `hash128` vector cleanup logic runs natively without modification. * **Fixed the Shared Drive blindspot:** The standard API query was missing team folders. Injected the `corpora="allDrives"` and `includeItemsFromAllDrives=True` override flags so the connector now accurately maps state across both personal workspaces and organizational Shared Drives. ### Testing: Isolated the Google API retrieval logic locally to prove the `SlimDoc` mapping works and correctly registers state drops when a file is trashed remotely. ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Performance Improvement --- rag/svr/sync_data_source.py | 67 +++++++++++++++++-- .../data-source/constant/index.tsx | 3 + 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 7fd9c1e090..2c6d72cc94 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -74,12 +74,19 @@ from common.log_utils import init_root_logger from common.signal_utils import start_tracemalloc_and_snapshot, stop_tracemalloc from common.versions import get_ragflow_version from box_sdk_gen import BoxOAuth, OAuthConfig, AccessToken +from collections import namedtuple MAX_CONCURRENT_TASKS = int(os.environ.get("MAX_CONCURRENT_TASKS", "5")) task_limiter = asyncio.Semaphore(MAX_CONCURRENT_TASKS) class SyncBase: + """ + Base class for all data source synchronization connectors. + + Defines the standard interface for connecting to external APIs, polling for + new or updated documents, and managing synchronization state intervals. + """ SOURCE_NAME: str = None def __init__(self, conf: dict) -> None: @@ -118,6 +125,13 @@ class SyncBase: logging.info("Connect to %s: %s, %s", name, details, cls.window_info(task)) async def __call__(self, task: dict): + """ + Entry point for executing a synchronization task worker. + + Manages task execution boundaries including status logging, asynchronous + timeouts, and top-level exception handling, while delegating the core + ingestion logic to `_run_task_logic`. + """ SyncLogsService.start(task["id"], task["connector_id"]) async with task_limiter: @@ -144,6 +158,13 @@ class SyncBase: SyncLogsService.schedule(task["connector_id"], task["kb_id"], task["poll_range_start"]) async def _run_task_logic(self, task: dict): + """ + Executes the core synchronization pipeline for a data source task. + + This method retrieves documents from the external source via the `_generate` method, + parses and upserts them into the Knowledge Base (KB), and handles stale document + reconciliation (sync deletion) if a remote snapshot (`file_list`) is provided. + """ generate_output = await self._generate(task) # `_generate()` currently supports two outputs: # 1. `document_batch_generator` @@ -236,6 +257,14 @@ class SyncBase: task["kb_id"], ) elif file_list is not None: + logging.info( + "[%s] Starting stale document reconciliation. Snapshot size: %d " + "(connector_id=%s, kb_id=%s)", + self.SOURCE_NAME, + len(file_list), + task["connector_id"], + task["kb_id"], + ) removed_docs, _ = ConnectorService.cleanup_stale_documents_for_task( task["id"], task["connector_id"], @@ -598,9 +627,15 @@ class Dropbox(SyncBase): class GoogleDrive(SyncBase): + """ + Data synchronization connector for Google Drive. + Handles both full re-indexing and incremental polling, including the capability + to synchronize deleted files by retrieving a lightweight snapshot of current files. + """ SOURCE_NAME: str = FileSource.GOOGLE_DRIVE async def _generate(self, task: dict): + """Generates document batches from Google Drive, handling both full and incremental syncs.""" connector_kwargs = { "include_shared_drives": self.conf.get("include_shared_drives", False), "include_my_drives": self.conf.get("include_my_drives", False), @@ -622,14 +657,31 @@ class GoogleDrive(SyncBase): if new_credentials: self._persist_rotated_credentials(task["connector_id"], new_credentials) + file_list = None + + # Capture end_time BEFORE the snapshot to prevent the ingestion race condition + end_time = datetime.now(timezone.utc).timestamp() + if task["reindex"] == "1" or not task["poll_range_start"]: start_time = 0.0 _begin_info = "totally" else: start_time = task["poll_range_start"].timestamp() _begin_info = f"from {task['poll_range_start']}" - - end_time = datetime.now(timezone.utc).timestamp() + + if self.conf.get("sync_deleted_files"): + file_list = [] + logging.info("Syncing deleted files (connector_id=%s)", task["connector_id"]) + SlimDoc = namedtuple('SlimDoc', ['id']) + + # Add observability timing so operators can track the O(N) cost + snapshot_start = time.perf_counter() + + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(SlimDoc(doc.id) for doc in slim_batch) + + logging.info("Slim snapshot fetched %d files in %.2f seconds", len(file_list), time.perf_counter() - snapshot_start) + raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE try: batch_size = int(raw_batch_size) @@ -639,6 +691,7 @@ class GoogleDrive(SyncBase): batch_size = INDEX_BATCH_SIZE def document_batches(): + """Yields paginated batches of parsed Google Drive documents using checkpoints.""" checkpoint = self.connector.build_dummy_checkpoint() pending_docs = [] iterations = 0 @@ -672,9 +725,11 @@ class GoogleDrive(SyncBase): except RuntimeError: admin_email = "unknown" self.log_connection("Google Drive", f"as {admin_email}", task) - return document_batches() + + return document_batches(), file_list def _persist_rotated_credentials(self, connector_id: str, credentials: dict[str, Any]) -> None: + """Saves refreshed OAuth credentials back to the database configuration.""" try: updated_conf = copy.deepcopy(self.conf) updated_conf["credentials"] = credentials @@ -683,8 +738,7 @@ class GoogleDrive(SyncBase): logging.info("Persisted refreshed Google Drive credentials for connector %s", connector_id) except Exception: logging.exception("Failed to persist refreshed Google Drive credentials for connector %s", connector_id) - - + class Jira(SyncBase): SOURCE_NAME: str = FileSource.JIRA @@ -1512,6 +1566,7 @@ func_factory = { async def dispatch_tasks(): + """Polls the database for pending synchronization tasks and dispatches them concurrently.""" while True: try: list(SyncLogsService.list_sync_tasks()[0]) @@ -1544,6 +1599,7 @@ stop_event = threading.Event() def signal_handler(sig, frame): + """Handles system interruption signals to ensure a graceful worker shutdown.""" logging.info("Received interrupt signal, shutting down...") stop_event.set() time.sleep(1) @@ -1555,6 +1611,7 @@ CONSUMER_NAME = "data_sync_" + CONSUMER_NO async def main(): + """Entry point for the RAGFlow data synchronization worker process.""" logging.info(r""" _____ _ _____ | __ \ | | / ____| diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index c645aa3a1e..2bb4d267f7 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -58,6 +58,9 @@ export const DataSourceFeatureVisibilityMap = { [DataSourceKey.GITHUB]: { syncDeletedFiles: true, }, +[DataSourceKey.GOOGLE_DRIVE]: { + syncDeletedFiles: true, + }, [DataSourceKey.CONFLUENCE]: { syncDeletedFiles: true, },