From 2bc8c6d35e7d62e50b49fd1f27325aabac9a4974 Mon Sep 17 00:00:00 2001 From: bitloi <89318445+bitloi@users.noreply.github.com> Date: Wed, 29 Apr 2026 08:05:11 -0300 Subject: [PATCH] feat(dropbox): support deleted-file sync (#14476) ### What problem does this PR solve? Partially addresses #14362 by adding deleted-file sync support for the Dropbox data source. Dropbox previously did not provide the slim current-file snapshot required by stale document reconciliation, and its sync runner returned only document batches. As a result, enabling deleted-file sync could not remove local documents that had been deleted from Dropbox. This PR: - Adds `retrieve_all_slim_docs_perm_sync()` to `DropboxConnector`. - Reuses Dropbox metadata traversal to collect current remote file IDs without downloading file contents. - Wires incremental Dropbox sync to return `(document_generator, file_list)` when `sync_deleted_files` is enabled. - Enables the deleted-file sync toggle for Dropbox in the data source settings UI. - Adds regression coverage for slim snapshots, nested folders, paginated listings, duplicate filenames, and full reindex behavior. Tests: - `uv run pytest test/unit_test/common/test_dropbox_connector.py -q` - `uv run pytest test/unit_test/rag/test_sync_data_source.py -q` - `uv run pytest test/unit_test/common/test_dropbox_connector.py test/unit_test/rag/test_sync_data_source.py -q` - `uv run ruff check common/data_source/dropbox_connector.py rag/svr/sync_data_source.py test/unit_test/common/test_dropbox_connector.py test/unit_test/rag/test_sync_data_source.py` - `./node_modules/.bin/eslint src/pages/user-setting/data-source/constant/index.tsx` ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- common/data_source/dropbox_connector.py | 102 +++++++----- rag/svr/sync_data_source.py | 16 +- .../common/test_dropbox_connector.py | 157 ++++++++++++++++++ test/unit_test/rag/test_sync_data_source.py | 93 +++++++++++ .../data-source/constant/index.tsx | 3 + 5 files changed, 321 insertions(+), 50 deletions(-) create mode 100644 test/unit_test/common/test_dropbox_connector.py diff --git a/common/data_source/dropbox_connector.py b/common/data_source/dropbox_connector.py index 0e7131d8f3..43ab08f4b0 100644 --- a/common/data_source/dropbox_connector.py +++ b/common/data_source/dropbox_connector.py @@ -14,14 +14,14 @@ from common.data_source.exceptions import ( ConnectorValidationError, InsufficientPermissionsError, ) -from common.data_source.interfaces import LoadConnector, PollConnector, SecondsSinceUnixEpoch -from common.data_source.models import Document, GenerateDocumentsOutput +from common.data_source.interfaces import LoadConnector, PollConnector, SecondsSinceUnixEpoch, SlimConnectorWithPermSync +from common.data_source.models import Document, GenerateDocumentsOutput, GenerateSlimDocumentOutput, SlimDocument from common.data_source.utils import get_file_ext logger = logging.getLogger(__name__) -class DropboxConnector(LoadConnector, PollConnector): +class DropboxConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): """Dropbox connector for accessing Dropbox files and folders""" def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None: @@ -87,57 +87,48 @@ class DropboxConnector(LoadConnector, PollConnector): if self.dropbox_client is None: raise ConnectorMissingCredentialError("Dropbox") - # Collect all files first to count filename occurrences - all_files = [] - self._collect_files_recursive(path, start, end, all_files) - + all_files: list[FileMetadata] = [] + self._collect_file_entries_recursive(path, start, end, all_files) + # Count filename occurrences filename_counts: dict[str, int] = {} - for entry, _ in all_files: + for entry in all_files: filename_counts[entry.name] = filename_counts.get(entry.name, 0) + 1 - + # Process files in batches batch: list[Document] = [] - for entry, downloaded_file in all_files: - modified_time = entry.client_modified - if modified_time.tzinfo is None: - modified_time = modified_time.replace(tzinfo=timezone.utc) - else: - modified_time = modified_time.astimezone(timezone.utc) - - # Use full path only if filename appears multiple times - if filename_counts.get(entry.name, 0) > 1: - # Remove leading slash and replace slashes with ' / ' - relative_path = entry.path_display.lstrip('/') - semantic_id = relative_path.replace('/', ' / ') if relative_path else entry.name - else: - semantic_id = entry.name - + for entry in all_files: + try: + downloaded_file = self._download_file(entry.path_display) + except Exception: + logger.exception(f"[Dropbox]: Error downloading file {entry.path_display}") + continue + batch.append( Document( id=f"dropbox:{entry.id}", blob=downloaded_file, source=DocumentSource.DROPBOX, - semantic_identifier=semantic_id, + semantic_identifier=self._get_semantic_identifier(entry, filename_counts), extension=get_file_ext(entry.name), - doc_updated_at=modified_time, + doc_updated_at=self._normalize_modified_time(entry.client_modified), size_bytes=entry.size if getattr(entry, "size", None) is not None else len(downloaded_file), ) ) - + if len(batch) == self.batch_size: yield batch batch = [] - + if batch: yield batch - def _collect_files_recursive( + def _collect_file_entries_recursive( self, path: str, start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None, - all_files: list, + all_files: list[FileMetadata], ) -> None: """Recursively collect all files matching time criteria.""" if self.dropbox_client is None: @@ -152,33 +143,56 @@ class DropboxConnector(LoadConnector, PollConnector): while True: for entry in result.entries: if isinstance(entry, FileMetadata): - modified_time = entry.client_modified - if modified_time.tzinfo is None: - modified_time = modified_time.replace(tzinfo=timezone.utc) - else: - modified_time = modified_time.astimezone(timezone.utc) - - time_as_seconds = modified_time.timestamp() + time_as_seconds = self._normalize_modified_time(entry.client_modified).timestamp() if start is not None and time_as_seconds <= start: continue if end is not None and time_as_seconds > end: continue - try: - downloaded_file = self._download_file(entry.path_display) - all_files.append((entry, downloaded_file)) - except Exception: - logger.exception(f"[Dropbox]: Error downloading file {entry.path_display}") - continue + all_files.append(entry) elif isinstance(entry, FolderMetadata): - self._collect_files_recursive(entry.path_lower, start, end, all_files) + self._collect_file_entries_recursive(entry.path_lower, start, end, all_files) if not result.has_more: break result = self.dropbox_client.files_list_folder_continue(result.cursor) + def _normalize_modified_time(self, modified_time): + if modified_time.tzinfo is None: + return modified_time.replace(tzinfo=timezone.utc) + return modified_time.astimezone(timezone.utc) + + def _get_semantic_identifier(self, entry: FileMetadata, filename_counts: dict[str, int]) -> str: + if filename_counts.get(entry.name, 0) <= 1: + return entry.name + + relative_path = entry.path_display.lstrip("/") + return relative_path.replace("/", " / ") if relative_path else entry.name + + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + del callback + + if self.dropbox_client is None: + raise ConnectorMissingCredentialError("Dropbox") + + all_files: list[FileMetadata] = [] + self._collect_file_entries_recursive("", None, None, all_files) + + batch: list[SlimDocument] = [] + for entry in all_files: + batch.append(SlimDocument(id=f"dropbox:{entry.id}")) + if len(batch) >= self.batch_size: + yield batch + batch = [] + + if batch: + yield batch + def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput: """Poll Dropbox for recent file changes""" if self.dropbox_client is None: diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 81ab42e7be..5d36a957f5 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -617,19 +617,23 @@ class Dropbox(SyncBase): async def _generate(self, task: dict): self.connector = DropboxConnector(batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE)) self.connector.load_credentials(self.conf["credentials"]) + poll_start = task["poll_range_start"] + file_list = None - if task["reindex"] == "1" or not task["poll_range_start"]: + if task["reindex"] == "1" or not poll_start: document_generator = self.connector.load_from_state() _begin_info = "totally" else: - poll_start = task["poll_range_start"] - document_generator = self.connector.poll_source( - poll_start.timestamp(), datetime.now(timezone.utc).timestamp() - ) + end_time = datetime.now(timezone.utc).timestamp() + if self.conf.get("sync_deleted_files"): + file_list = [] + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) + document_generator = self.connector.poll_source(poll_start.timestamp(), end_time) _begin_info = f"from {poll_start}" self.log_connection("Dropbox", "workspace", task) - return document_generator + return document_generator, file_list class GoogleDrive(SyncBase): diff --git a/test/unit_test/common/test_dropbox_connector.py b/test/unit_test/common/test_dropbox_connector.py new file mode 100644 index 0000000000..f9976b5977 --- /dev/null +++ b/test/unit_test/common/test_dropbox_connector.py @@ -0,0 +1,157 @@ +# +# Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import importlib.util +import sys +from datetime import datetime, timezone +from pathlib import Path +from types import ModuleType, SimpleNamespace + + +def _load_dropbox_connector_module(): + repo_root = Path(__file__).resolve().parents[3] + package_name = "common.data_source" + saved_modules = {name: module for name, module in sys.modules.items() if name == package_name or name.startswith(f"{package_name}.")} + package_stub = ModuleType(package_name) + package_stub.__path__ = [str(repo_root / "common" / "data_source")] + sys.modules[package_name] = package_stub + + try: + spec = importlib.util.spec_from_file_location( + "_dropbox_connector_under_test", + repo_root / "common" / "data_source" / "dropbox_connector.py", + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + finally: + for name in list(sys.modules): + if name == package_name or name.startswith(f"{package_name}."): + if name in saved_modules: + sys.modules[name] = saved_modules[name] + else: + sys.modules.pop(name, None) + + +dropbox_connector = _load_dropbox_connector_module() +DropboxConnector = dropbox_connector.DropboxConnector + + +class _FakeFileMetadata: + def __init__(self, file_id: str, name: str, path: str, client_modified: datetime, size: int = 10) -> None: + self.id = file_id + self.name = name + self.path_display = path + self.path_lower = path.lower() + self.client_modified = client_modified + self.size = size + + +class _FakeFolderMetadata: + def __init__(self, name: str, path: str) -> None: + self.name = name + self.path_display = path + self.path_lower = path.lower() + + +class _FakeListResult: + def __init__(self, entries: list, cursor: str = "", has_more: bool = False) -> None: + self.entries = entries + self.cursor = cursor + self.has_more = has_more + + +class _FakeDropboxClient: + def __init__(self) -> None: + self.downloaded_paths: list[str] = [] + self.root_file = _FakeFileMetadata( + "id-root", + "same.txt", + "/same.txt", + datetime(2026, 1, 1, 12, tzinfo=timezone.utc), + ) + self.nested_file = _FakeFileMetadata( + "id-nested", + "same.txt", + "/folder/same.txt", + datetime(2026, 1, 1, 13, tzinfo=timezone.utc), + ) + self.paged_file = _FakeFileMetadata( + "id-paged", + "unique.pdf", + "/unique.pdf", + datetime(2026, 1, 1, 14, tzinfo=timezone.utc), + ) + + def files_list_folder(self, path: str, **_kwargs): + if path == "": + return _FakeListResult( + [self.root_file, _FakeFolderMetadata("folder", "/folder")], + cursor="cursor-1", + has_more=True, + ) + if path == "/folder": + return _FakeListResult([self.nested_file]) + raise AssertionError(f"unexpected Dropbox folder path: {path}") + + def files_list_folder_continue(self, cursor: str): + assert cursor == "cursor-1" + return _FakeListResult([self.paged_file]) + + def files_download(self, path: str): + self.downloaded_paths.append(path) + return None, SimpleNamespace(content=f"content:{path}".encode()) + + +def test_retrieve_all_slim_docs_perm_sync_lists_current_file_ids_without_downloads(monkeypatch): + monkeypatch.setattr(dropbox_connector, "FileMetadata", _FakeFileMetadata) + monkeypatch.setattr(dropbox_connector, "FolderMetadata", _FakeFolderMetadata) + connector = DropboxConnector(batch_size=2) + fake_client = _FakeDropboxClient() + connector.dropbox_client = fake_client + + batches = list(connector.retrieve_all_slim_docs_perm_sync()) + + assert [[doc.id for doc in batch] for batch in batches] == [ + ["dropbox:id-root", "dropbox:id-nested"], + ["dropbox:id-paged"], + ] + assert fake_client.downloaded_paths == [] + + +def test_load_from_state_keeps_duplicate_filename_semantic_paths(monkeypatch): + monkeypatch.setattr(dropbox_connector, "FileMetadata", _FakeFileMetadata) + monkeypatch.setattr(dropbox_connector, "FolderMetadata", _FakeFolderMetadata) + connector = DropboxConnector(batch_size=10) + fake_client = _FakeDropboxClient() + connector.dropbox_client = fake_client + + docs = list(next(connector.load_from_state())) + + assert [doc.id for doc in docs] == [ + "dropbox:id-root", + "dropbox:id-nested", + "dropbox:id-paged", + ] + assert [doc.semantic_identifier for doc in docs] == [ + "same.txt", + "folder / same.txt", + "unique.pdf", + ] + assert fake_client.downloaded_paths == [ + "/same.txt", + "/folder/same.txt", + "/unique.pdf", + ] diff --git a/test/unit_test/rag/test_sync_data_source.py b/test/unit_test/rag/test_sync_data_source.py index e76722ba1f..f513ec7a31 100644 --- a/test/unit_test/rag/test_sync_data_source.py +++ b/test/unit_test/rag/test_sync_data_source.py @@ -19,6 +19,7 @@ import os import sys import types import warnings +from datetime import datetime, timezone import pytest @@ -167,3 +168,95 @@ async def test_run_task_logic_cleans_up_for_non_empty_snapshot(monkeypatch): {}, ) ] + + +class _FakeDropboxConnector: + instance = None + + def __init__(self, batch_size): + self.batch_size = batch_size + self.credentials = None + self.retrieve_all_slim_docs_perm_sync_called = False + self.snapshot_called_before_poll = None + self.poll_source_call = None + self.load_from_state_called = False + self.poll_source_called = False + _FakeDropboxConnector.instance = self + + def load_credentials(self, credentials): + self.credentials = credentials + + def retrieve_all_slim_docs_perm_sync(self, callback=None): + del callback + self.retrieve_all_slim_docs_perm_sync_called = True + self.snapshot_called_before_poll = not self.poll_source_called + yield [types.SimpleNamespace(id="dropbox:id-1")] + yield [types.SimpleNamespace(id="dropbox:id-2")] + + def poll_source(self, start, end): + self.poll_source_called = True + self.poll_source_call = (start, end) + return iter((["poll-sync"],)) + + def load_from_state(self): + self.load_from_state_called = True + return iter((["full-sync"],)) + + +@pytest.mark.anyio +@pytest.mark.p2 +async def test_dropbox_generate_returns_snapshot_when_sync_deleted_enabled(monkeypatch): + monkeypatch.setattr(sync_data_source, "DropboxConnector", _FakeDropboxConnector) + poll_start = datetime(2026, 1, 1, tzinfo=timezone.utc) + task = { + **_make_task(), + "reindex": "0", + "poll_range_start": poll_start, + "skip_connection_log": True, + } + sync = sync_data_source.Dropbox( + { + "batch_size": 2, + "sync_deleted_files": True, + "credentials": {"dropbox_access_token": "token-1"}, + } + ) + + document_generator, file_list = await sync._generate(task) + connector = _FakeDropboxConnector.instance + + assert list(document_generator) == [["poll-sync"]] + assert [doc.id for doc in file_list] == ["dropbox:id-1", "dropbox:id-2"] + assert connector.credentials == {"dropbox_access_token": "token-1"} + assert connector.retrieve_all_slim_docs_perm_sync_called is True + assert connector.snapshot_called_before_poll is True + assert connector.poll_source_call[0] == poll_start.timestamp() + assert connector.poll_source_call[1] >= poll_start.timestamp() + + +@pytest.mark.anyio +@pytest.mark.p2 +async def test_dropbox_generate_skips_snapshot_for_full_reindex(monkeypatch): + monkeypatch.setattr(sync_data_source, "DropboxConnector", _FakeDropboxConnector) + task = { + **_make_task(), + "reindex": "1", + "poll_range_start": datetime(2026, 1, 1, tzinfo=timezone.utc), + "skip_connection_log": True, + } + sync = sync_data_source.Dropbox( + { + "batch_size": 2, + "sync_deleted_files": True, + "credentials": {"dropbox_access_token": "token-1"}, + } + ) + + document_generator, file_list = await sync._generate(task) + connector = _FakeDropboxConnector.instance + + assert list(document_generator) == [["full-sync"]] + assert file_list is None + assert connector.load_from_state_called is True + assert connector.retrieve_all_slim_docs_perm_sync_called is False + assert connector.poll_source_called is False diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 78e0f59906..efe1c687e4 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -73,6 +73,9 @@ export const DataSourceFeatureVisibilityMap = { [DataSourceKey.BOX]: { syncDeletedFiles: true, }, + [DataSourceKey.DROPBOX]: { + syncDeletedFiles: true, + }, [DataSourceKey.S3]: { syncDeletedFiles: true, },