mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
feat(dropbox): support deleted-file sync (#14476)
### What problem does this PR solve? Partially addresses #14362 by adding deleted-file sync support for the Dropbox data source. Dropbox previously did not provide the slim current-file snapshot required by stale document reconciliation, and its sync runner returned only document batches. As a result, enabling deleted-file sync could not remove local documents that had been deleted from Dropbox. This PR: - Adds `retrieve_all_slim_docs_perm_sync()` to `DropboxConnector`. - Reuses Dropbox metadata traversal to collect current remote file IDs without downloading file contents. - Wires incremental Dropbox sync to return `(document_generator, file_list)` when `sync_deleted_files` is enabled. - Enables the deleted-file sync toggle for Dropbox in the data source settings UI. - Adds regression coverage for slim snapshots, nested folders, paginated listings, duplicate filenames, and full reindex behavior. Tests: - `uv run pytest test/unit_test/common/test_dropbox_connector.py -q` - `uv run pytest test/unit_test/rag/test_sync_data_source.py -q` - `uv run pytest test/unit_test/common/test_dropbox_connector.py test/unit_test/rag/test_sync_data_source.py -q` - `uv run ruff check common/data_source/dropbox_connector.py rag/svr/sync_data_source.py test/unit_test/common/test_dropbox_connector.py test/unit_test/rag/test_sync_data_source.py` - `./node_modules/.bin/eslint src/pages/user-setting/data-source/constant/index.tsx` ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@@ -14,14 +14,14 @@ from common.data_source.exceptions import (
|
||||
ConnectorValidationError,
|
||||
InsufficientPermissionsError,
|
||||
)
|
||||
from common.data_source.interfaces import LoadConnector, PollConnector, SecondsSinceUnixEpoch
|
||||
from common.data_source.models import Document, GenerateDocumentsOutput
|
||||
from common.data_source.interfaces import LoadConnector, PollConnector, SecondsSinceUnixEpoch, SlimConnectorWithPermSync
|
||||
from common.data_source.models import Document, GenerateDocumentsOutput, GenerateSlimDocumentOutput, SlimDocument
|
||||
from common.data_source.utils import get_file_ext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DropboxConnector(LoadConnector, PollConnector):
|
||||
class DropboxConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync):
|
||||
"""Dropbox connector for accessing Dropbox files and folders"""
|
||||
|
||||
def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None:
|
||||
@@ -87,57 +87,48 @@ class DropboxConnector(LoadConnector, PollConnector):
|
||||
if self.dropbox_client is None:
|
||||
raise ConnectorMissingCredentialError("Dropbox")
|
||||
|
||||
# Collect all files first to count filename occurrences
|
||||
all_files = []
|
||||
self._collect_files_recursive(path, start, end, all_files)
|
||||
|
||||
all_files: list[FileMetadata] = []
|
||||
self._collect_file_entries_recursive(path, start, end, all_files)
|
||||
|
||||
# Count filename occurrences
|
||||
filename_counts: dict[str, int] = {}
|
||||
for entry, _ in all_files:
|
||||
for entry in all_files:
|
||||
filename_counts[entry.name] = filename_counts.get(entry.name, 0) + 1
|
||||
|
||||
|
||||
# Process files in batches
|
||||
batch: list[Document] = []
|
||||
for entry, downloaded_file in all_files:
|
||||
modified_time = entry.client_modified
|
||||
if modified_time.tzinfo is None:
|
||||
modified_time = modified_time.replace(tzinfo=timezone.utc)
|
||||
else:
|
||||
modified_time = modified_time.astimezone(timezone.utc)
|
||||
|
||||
# Use full path only if filename appears multiple times
|
||||
if filename_counts.get(entry.name, 0) > 1:
|
||||
# Remove leading slash and replace slashes with ' / '
|
||||
relative_path = entry.path_display.lstrip('/')
|
||||
semantic_id = relative_path.replace('/', ' / ') if relative_path else entry.name
|
||||
else:
|
||||
semantic_id = entry.name
|
||||
|
||||
for entry in all_files:
|
||||
try:
|
||||
downloaded_file = self._download_file(entry.path_display)
|
||||
except Exception:
|
||||
logger.exception(f"[Dropbox]: Error downloading file {entry.path_display}")
|
||||
continue
|
||||
|
||||
batch.append(
|
||||
Document(
|
||||
id=f"dropbox:{entry.id}",
|
||||
blob=downloaded_file,
|
||||
source=DocumentSource.DROPBOX,
|
||||
semantic_identifier=semantic_id,
|
||||
semantic_identifier=self._get_semantic_identifier(entry, filename_counts),
|
||||
extension=get_file_ext(entry.name),
|
||||
doc_updated_at=modified_time,
|
||||
doc_updated_at=self._normalize_modified_time(entry.client_modified),
|
||||
size_bytes=entry.size if getattr(entry, "size", None) is not None else len(downloaded_file),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
if len(batch) == self.batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
|
||||
|
||||
if batch:
|
||||
yield batch
|
||||
|
||||
def _collect_files_recursive(
|
||||
def _collect_file_entries_recursive(
|
||||
self,
|
||||
path: str,
|
||||
start: SecondsSinceUnixEpoch | None,
|
||||
end: SecondsSinceUnixEpoch | None,
|
||||
all_files: list,
|
||||
all_files: list[FileMetadata],
|
||||
) -> None:
|
||||
"""Recursively collect all files matching time criteria."""
|
||||
if self.dropbox_client is None:
|
||||
@@ -152,33 +143,56 @@ class DropboxConnector(LoadConnector, PollConnector):
|
||||
while True:
|
||||
for entry in result.entries:
|
||||
if isinstance(entry, FileMetadata):
|
||||
modified_time = entry.client_modified
|
||||
if modified_time.tzinfo is None:
|
||||
modified_time = modified_time.replace(tzinfo=timezone.utc)
|
||||
else:
|
||||
modified_time = modified_time.astimezone(timezone.utc)
|
||||
|
||||
time_as_seconds = modified_time.timestamp()
|
||||
time_as_seconds = self._normalize_modified_time(entry.client_modified).timestamp()
|
||||
if start is not None and time_as_seconds <= start:
|
||||
continue
|
||||
if end is not None and time_as_seconds > end:
|
||||
continue
|
||||
|
||||
try:
|
||||
downloaded_file = self._download_file(entry.path_display)
|
||||
all_files.append((entry, downloaded_file))
|
||||
except Exception:
|
||||
logger.exception(f"[Dropbox]: Error downloading file {entry.path_display}")
|
||||
continue
|
||||
all_files.append(entry)
|
||||
|
||||
elif isinstance(entry, FolderMetadata):
|
||||
self._collect_files_recursive(entry.path_lower, start, end, all_files)
|
||||
self._collect_file_entries_recursive(entry.path_lower, start, end, all_files)
|
||||
|
||||
if not result.has_more:
|
||||
break
|
||||
|
||||
result = self.dropbox_client.files_list_folder_continue(result.cursor)
|
||||
|
||||
def _normalize_modified_time(self, modified_time):
|
||||
if modified_time.tzinfo is None:
|
||||
return modified_time.replace(tzinfo=timezone.utc)
|
||||
return modified_time.astimezone(timezone.utc)
|
||||
|
||||
def _get_semantic_identifier(self, entry: FileMetadata, filename_counts: dict[str, int]) -> str:
|
||||
if filename_counts.get(entry.name, 0) <= 1:
|
||||
return entry.name
|
||||
|
||||
relative_path = entry.path_display.lstrip("/")
|
||||
return relative_path.replace("/", " / ") if relative_path else entry.name
|
||||
|
||||
def retrieve_all_slim_docs_perm_sync(
|
||||
self,
|
||||
callback: Any = None,
|
||||
) -> GenerateSlimDocumentOutput:
|
||||
del callback
|
||||
|
||||
if self.dropbox_client is None:
|
||||
raise ConnectorMissingCredentialError("Dropbox")
|
||||
|
||||
all_files: list[FileMetadata] = []
|
||||
self._collect_file_entries_recursive("", None, None, all_files)
|
||||
|
||||
batch: list[SlimDocument] = []
|
||||
for entry in all_files:
|
||||
batch.append(SlimDocument(id=f"dropbox:{entry.id}"))
|
||||
if len(batch) >= self.batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
|
||||
if batch:
|
||||
yield batch
|
||||
|
||||
def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput:
|
||||
"""Poll Dropbox for recent file changes"""
|
||||
if self.dropbox_client is None:
|
||||
|
||||
@@ -617,19 +617,23 @@ class Dropbox(SyncBase):
|
||||
async def _generate(self, task: dict):
|
||||
self.connector = DropboxConnector(batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE))
|
||||
self.connector.load_credentials(self.conf["credentials"])
|
||||
poll_start = task["poll_range_start"]
|
||||
file_list = None
|
||||
|
||||
if task["reindex"] == "1" or not task["poll_range_start"]:
|
||||
if task["reindex"] == "1" or not poll_start:
|
||||
document_generator = self.connector.load_from_state()
|
||||
_begin_info = "totally"
|
||||
else:
|
||||
poll_start = task["poll_range_start"]
|
||||
document_generator = self.connector.poll_source(
|
||||
poll_start.timestamp(), datetime.now(timezone.utc).timestamp()
|
||||
)
|
||||
end_time = datetime.now(timezone.utc).timestamp()
|
||||
if self.conf.get("sync_deleted_files"):
|
||||
file_list = []
|
||||
for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync():
|
||||
file_list.extend(slim_batch)
|
||||
document_generator = self.connector.poll_source(poll_start.timestamp(), end_time)
|
||||
_begin_info = f"from {poll_start}"
|
||||
|
||||
self.log_connection("Dropbox", "workspace", task)
|
||||
return document_generator
|
||||
return document_generator, file_list
|
||||
|
||||
|
||||
class GoogleDrive(SyncBase):
|
||||
|
||||
157
test/unit_test/common/test_dropbox_connector.py
Normal file
157
test/unit_test/common/test_dropbox_connector.py
Normal file
@@ -0,0 +1,157 @@
|
||||
#
|
||||
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import importlib.util
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from types import ModuleType, SimpleNamespace
|
||||
|
||||
|
||||
def _load_dropbox_connector_module():
|
||||
repo_root = Path(__file__).resolve().parents[3]
|
||||
package_name = "common.data_source"
|
||||
saved_modules = {name: module for name, module in sys.modules.items() if name == package_name or name.startswith(f"{package_name}.")}
|
||||
package_stub = ModuleType(package_name)
|
||||
package_stub.__path__ = [str(repo_root / "common" / "data_source")]
|
||||
sys.modules[package_name] = package_stub
|
||||
|
||||
try:
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"_dropbox_connector_under_test",
|
||||
repo_root / "common" / "data_source" / "dropbox_connector.py",
|
||||
)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
finally:
|
||||
for name in list(sys.modules):
|
||||
if name == package_name or name.startswith(f"{package_name}."):
|
||||
if name in saved_modules:
|
||||
sys.modules[name] = saved_modules[name]
|
||||
else:
|
||||
sys.modules.pop(name, None)
|
||||
|
||||
|
||||
dropbox_connector = _load_dropbox_connector_module()
|
||||
DropboxConnector = dropbox_connector.DropboxConnector
|
||||
|
||||
|
||||
class _FakeFileMetadata:
|
||||
def __init__(self, file_id: str, name: str, path: str, client_modified: datetime, size: int = 10) -> None:
|
||||
self.id = file_id
|
||||
self.name = name
|
||||
self.path_display = path
|
||||
self.path_lower = path.lower()
|
||||
self.client_modified = client_modified
|
||||
self.size = size
|
||||
|
||||
|
||||
class _FakeFolderMetadata:
|
||||
def __init__(self, name: str, path: str) -> None:
|
||||
self.name = name
|
||||
self.path_display = path
|
||||
self.path_lower = path.lower()
|
||||
|
||||
|
||||
class _FakeListResult:
|
||||
def __init__(self, entries: list, cursor: str = "", has_more: bool = False) -> None:
|
||||
self.entries = entries
|
||||
self.cursor = cursor
|
||||
self.has_more = has_more
|
||||
|
||||
|
||||
class _FakeDropboxClient:
|
||||
def __init__(self) -> None:
|
||||
self.downloaded_paths: list[str] = []
|
||||
self.root_file = _FakeFileMetadata(
|
||||
"id-root",
|
||||
"same.txt",
|
||||
"/same.txt",
|
||||
datetime(2026, 1, 1, 12, tzinfo=timezone.utc),
|
||||
)
|
||||
self.nested_file = _FakeFileMetadata(
|
||||
"id-nested",
|
||||
"same.txt",
|
||||
"/folder/same.txt",
|
||||
datetime(2026, 1, 1, 13, tzinfo=timezone.utc),
|
||||
)
|
||||
self.paged_file = _FakeFileMetadata(
|
||||
"id-paged",
|
||||
"unique.pdf",
|
||||
"/unique.pdf",
|
||||
datetime(2026, 1, 1, 14, tzinfo=timezone.utc),
|
||||
)
|
||||
|
||||
def files_list_folder(self, path: str, **_kwargs):
|
||||
if path == "":
|
||||
return _FakeListResult(
|
||||
[self.root_file, _FakeFolderMetadata("folder", "/folder")],
|
||||
cursor="cursor-1",
|
||||
has_more=True,
|
||||
)
|
||||
if path == "/folder":
|
||||
return _FakeListResult([self.nested_file])
|
||||
raise AssertionError(f"unexpected Dropbox folder path: {path}")
|
||||
|
||||
def files_list_folder_continue(self, cursor: str):
|
||||
assert cursor == "cursor-1"
|
||||
return _FakeListResult([self.paged_file])
|
||||
|
||||
def files_download(self, path: str):
|
||||
self.downloaded_paths.append(path)
|
||||
return None, SimpleNamespace(content=f"content:{path}".encode())
|
||||
|
||||
|
||||
def test_retrieve_all_slim_docs_perm_sync_lists_current_file_ids_without_downloads(monkeypatch):
|
||||
monkeypatch.setattr(dropbox_connector, "FileMetadata", _FakeFileMetadata)
|
||||
monkeypatch.setattr(dropbox_connector, "FolderMetadata", _FakeFolderMetadata)
|
||||
connector = DropboxConnector(batch_size=2)
|
||||
fake_client = _FakeDropboxClient()
|
||||
connector.dropbox_client = fake_client
|
||||
|
||||
batches = list(connector.retrieve_all_slim_docs_perm_sync())
|
||||
|
||||
assert [[doc.id for doc in batch] for batch in batches] == [
|
||||
["dropbox:id-root", "dropbox:id-nested"],
|
||||
["dropbox:id-paged"],
|
||||
]
|
||||
assert fake_client.downloaded_paths == []
|
||||
|
||||
|
||||
def test_load_from_state_keeps_duplicate_filename_semantic_paths(monkeypatch):
|
||||
monkeypatch.setattr(dropbox_connector, "FileMetadata", _FakeFileMetadata)
|
||||
monkeypatch.setattr(dropbox_connector, "FolderMetadata", _FakeFolderMetadata)
|
||||
connector = DropboxConnector(batch_size=10)
|
||||
fake_client = _FakeDropboxClient()
|
||||
connector.dropbox_client = fake_client
|
||||
|
||||
docs = list(next(connector.load_from_state()))
|
||||
|
||||
assert [doc.id for doc in docs] == [
|
||||
"dropbox:id-root",
|
||||
"dropbox:id-nested",
|
||||
"dropbox:id-paged",
|
||||
]
|
||||
assert [doc.semantic_identifier for doc in docs] == [
|
||||
"same.txt",
|
||||
"folder / same.txt",
|
||||
"unique.pdf",
|
||||
]
|
||||
assert fake_client.downloaded_paths == [
|
||||
"/same.txt",
|
||||
"/folder/same.txt",
|
||||
"/unique.pdf",
|
||||
]
|
||||
@@ -19,6 +19,7 @@ import os
|
||||
import sys
|
||||
import types
|
||||
import warnings
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -167,3 +168,95 @@ async def test_run_task_logic_cleans_up_for_non_empty_snapshot(monkeypatch):
|
||||
{},
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
class _FakeDropboxConnector:
|
||||
instance = None
|
||||
|
||||
def __init__(self, batch_size):
|
||||
self.batch_size = batch_size
|
||||
self.credentials = None
|
||||
self.retrieve_all_slim_docs_perm_sync_called = False
|
||||
self.snapshot_called_before_poll = None
|
||||
self.poll_source_call = None
|
||||
self.load_from_state_called = False
|
||||
self.poll_source_called = False
|
||||
_FakeDropboxConnector.instance = self
|
||||
|
||||
def load_credentials(self, credentials):
|
||||
self.credentials = credentials
|
||||
|
||||
def retrieve_all_slim_docs_perm_sync(self, callback=None):
|
||||
del callback
|
||||
self.retrieve_all_slim_docs_perm_sync_called = True
|
||||
self.snapshot_called_before_poll = not self.poll_source_called
|
||||
yield [types.SimpleNamespace(id="dropbox:id-1")]
|
||||
yield [types.SimpleNamespace(id="dropbox:id-2")]
|
||||
|
||||
def poll_source(self, start, end):
|
||||
self.poll_source_called = True
|
||||
self.poll_source_call = (start, end)
|
||||
return iter((["poll-sync"],))
|
||||
|
||||
def load_from_state(self):
|
||||
self.load_from_state_called = True
|
||||
return iter((["full-sync"],))
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@pytest.mark.p2
|
||||
async def test_dropbox_generate_returns_snapshot_when_sync_deleted_enabled(monkeypatch):
|
||||
monkeypatch.setattr(sync_data_source, "DropboxConnector", _FakeDropboxConnector)
|
||||
poll_start = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
||||
task = {
|
||||
**_make_task(),
|
||||
"reindex": "0",
|
||||
"poll_range_start": poll_start,
|
||||
"skip_connection_log": True,
|
||||
}
|
||||
sync = sync_data_source.Dropbox(
|
||||
{
|
||||
"batch_size": 2,
|
||||
"sync_deleted_files": True,
|
||||
"credentials": {"dropbox_access_token": "token-1"},
|
||||
}
|
||||
)
|
||||
|
||||
document_generator, file_list = await sync._generate(task)
|
||||
connector = _FakeDropboxConnector.instance
|
||||
|
||||
assert list(document_generator) == [["poll-sync"]]
|
||||
assert [doc.id for doc in file_list] == ["dropbox:id-1", "dropbox:id-2"]
|
||||
assert connector.credentials == {"dropbox_access_token": "token-1"}
|
||||
assert connector.retrieve_all_slim_docs_perm_sync_called is True
|
||||
assert connector.snapshot_called_before_poll is True
|
||||
assert connector.poll_source_call[0] == poll_start.timestamp()
|
||||
assert connector.poll_source_call[1] >= poll_start.timestamp()
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@pytest.mark.p2
|
||||
async def test_dropbox_generate_skips_snapshot_for_full_reindex(monkeypatch):
|
||||
monkeypatch.setattr(sync_data_source, "DropboxConnector", _FakeDropboxConnector)
|
||||
task = {
|
||||
**_make_task(),
|
||||
"reindex": "1",
|
||||
"poll_range_start": datetime(2026, 1, 1, tzinfo=timezone.utc),
|
||||
"skip_connection_log": True,
|
||||
}
|
||||
sync = sync_data_source.Dropbox(
|
||||
{
|
||||
"batch_size": 2,
|
||||
"sync_deleted_files": True,
|
||||
"credentials": {"dropbox_access_token": "token-1"},
|
||||
}
|
||||
)
|
||||
|
||||
document_generator, file_list = await sync._generate(task)
|
||||
connector = _FakeDropboxConnector.instance
|
||||
|
||||
assert list(document_generator) == [["full-sync"]]
|
||||
assert file_list is None
|
||||
assert connector.load_from_state_called is True
|
||||
assert connector.retrieve_all_slim_docs_perm_sync_called is False
|
||||
assert connector.poll_source_called is False
|
||||
|
||||
@@ -73,6 +73,9 @@ export const DataSourceFeatureVisibilityMap = {
|
||||
[DataSourceKey.BOX]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
[DataSourceKey.DROPBOX]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
[DataSourceKey.S3]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user