diff --git a/common/constants.py b/common/constants.py index a766a21710..60b09aa8bb 100644 --- a/common/constants.py +++ b/common/constants.py @@ -154,6 +154,7 @@ class FileSource(StrEnum): MYSQL = "mysql" POSTGRESQL = "postgresql" DINGTALK_AI_TABLE = "dingtalk_ai_table" + ONEDRIVE = "onedrive" class PipelineTaskType(StrEnum): diff --git a/common/data_source/__init__.py b/common/data_source/__init__.py index 34bb467d9f..2a3682747d 100644 --- a/common/data_source/__init__.py +++ b/common/data_source/__init__.py @@ -34,6 +34,7 @@ from .dropbox_connector import DropboxConnector from .google_drive.connector import GoogleDriveConnector from .jira.connector import JiraConnector from .sharepoint_connector import SharePointConnector +from .onedrive_connector import OneDriveConnector from .teams_connector import TeamsConnector from .moodle_connector import MoodleConnector from .airtable_connector import AirtableConnector @@ -67,6 +68,7 @@ __all__ = [ "GoogleDriveConnector", "JiraConnector", "SharePointConnector", + "OneDriveConnector", "TeamsConnector", "MoodleConnector", "BlobType", diff --git a/common/data_source/config.py b/common/data_source/config.py index 08d0209e03..c445844b70 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -69,6 +69,7 @@ class DocumentSource(str, Enum): MYSQL = "mysql" POSTGRESQL = "postgresql" DINGTALK_AI_TABLE = "dingtalk_ai_table" + ONEDRIVE = "onedrive" class FileOrigin(str, Enum): diff --git a/common/data_source/onedrive_connector.py b/common/data_source/onedrive_connector.py new file mode 100644 index 0000000000..ffe26e82bf --- /dev/null +++ b/common/data_source/onedrive_connector.py @@ -0,0 +1,363 @@ +"""OneDrive data source connector""" + +import logging +from typing import Any, Generator + +import msal +import requests + +from common.data_source.config import INDEX_BATCH_SIZE +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + ConnectorValidationError, + InsufficientPermissionsError, + UnexpectedValidationError, +) +from common.data_source.interfaces import ( + CheckpointedConnectorWithPermSync, + SecondsSinceUnixEpoch, + SlimConnectorWithPermSync, +) +from common.data_source.models import ConnectorCheckpoint, SlimDocument + +logger = logging.getLogger(__name__) + +_GRAPH_BASE = "https://graph.microsoft.com/v1.0" +_GRAPH_SCOPE = ["https://graph.microsoft.com/.default"] + +# File extensions we support for ingestion +_SUPPORTED_EXTENSIONS = { + ".pdf", ".docx", ".doc", ".xlsx", ".xls", + ".pptx", ".ppt", ".txt", ".md", ".csv", +} + + +class OneDriveCheckpoint(ConnectorCheckpoint): + """OneDrive-specific checkpoint tracking delta links per drive.""" + delta_links: dict[str, str] | None = None + + +class OneDriveConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync): + """ + OneDrive / OneDrive for Business connector. + + Uses Microsoft Graph delta queries so incremental syncs only fetch + changed items. Requires application permissions: + - Files.Read.All + """ + + def __init__( + self, + batch_size: int = INDEX_BATCH_SIZE, + folder_path: str | None = None, + ) -> None: + self.batch_size = batch_size + self.folder_path = folder_path # optional sub-folder filter, e.g. "/Documents" + self._access_token: str | None = None + self._tenant_id: str | None = None + + # ------------------------------------------------------------------ + # Auth + # ------------------------------------------------------------------ + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + tenant_id = credentials.get("tenant_id") + client_id = credentials.get("client_id") + client_secret = credentials.get("client_secret") + + if not all([tenant_id, client_id, client_secret]): + raise ConnectorMissingCredentialError( + "OneDrive credentials are incomplete (tenant_id, client_id, client_secret required)" + ) + + self._tenant_id = tenant_id + + app = msal.ConfidentialClientApplication( + client_id=client_id, + client_credential=client_secret, + authority=f"https://login.microsoftonline.com/{tenant_id}", + ) + result = app.acquire_token_for_client(scopes=_GRAPH_SCOPE) + + if "access_token" not in result: + error = result.get("error_description", result.get("error", "unknown")) + raise ConnectorMissingCredentialError( + f"Failed to acquire OneDrive access token: {error}" + ) + + self._access_token = result["access_token"] + return None + + # ------------------------------------------------------------------ + # Validation + # ------------------------------------------------------------------ + + def validate_connector_settings(self) -> None: + if not self._access_token: + raise ConnectorMissingCredentialError("OneDrive") + + # Probe: list the first page of drives in the tenant. + # Requires Files.Read.All. + resp = self._get(f"{_GRAPH_BASE}/drives?$top=1") + if resp.status_code == 401: + raise ConnectorMissingCredentialError( + "OneDrive access token is invalid or expired." + ) + if resp.status_code == 403: + raise InsufficientPermissionsError( + "The service principal lacks the 'Files.Read.All' permission " + "required by the OneDrive connector." + ) + if not resp.ok: + raise UnexpectedValidationError( + f"OneDrive validation failed (HTTP {resp.status_code}): {resp.text[:200]}" + ) + + data = resp.json() + if "value" not in data: + raise ConnectorValidationError( + "Unexpected response format from Microsoft Graph /drives." + ) + + # ------------------------------------------------------------------ + # Checkpoint helpers + # ------------------------------------------------------------------ + + def build_dummy_checkpoint(self) -> OneDriveCheckpoint: + return OneDriveCheckpoint(has_more=True, delta_links={}) + + def validate_checkpoint_json(self, checkpoint_json: str) -> OneDriveCheckpoint: + try: + return OneDriveCheckpoint.model_validate_json(checkpoint_json) + except Exception: + return self.build_dummy_checkpoint() + + # ------------------------------------------------------------------ + # Core data loading + # ------------------------------------------------------------------ + + def poll_source( + self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch + ) -> Any: + """Return documents modified at or after *start* (epoch seconds). + + Kept for callers that prefer the time-window interface; internally + defers to the same delta-walk used by load_from_checkpoint and + filters in-window items by lastModifiedDateTime. + """ + return self._iter_documents(since_epoch=start) + + def load_from_checkpoint( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ConnectorCheckpoint, + ) -> Any: + """Resume from *checkpoint*'s delta_links and apply the start filter. + + The delta_links map carries per-drive @odata.deltaLink values from the + previous run; when present the walk resumes from those links instead + of crawling each drive's root, which is what makes incremental syncs + cheap. The start_time is still applied as a lastModifiedDateTime + floor so callers that pass a window (and have no persisted delta + link yet) don't have to re-process everything. + """ + if not isinstance(checkpoint, OneDriveCheckpoint): + checkpoint = self.build_dummy_checkpoint() + since = start if start else None + return self._iter_documents(checkpoint=checkpoint, since_epoch=since) + + def load_from_checkpoint_with_perm_sync( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ConnectorCheckpoint, + ) -> Any: + return self.load_from_checkpoint(start, end, checkpoint) + + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> Generator[list[SlimDocument], None, None]: + """Yield batches of slim documents for prune / permission sync. + + The prune collector in rag/svr/sync_data_source._collect_prune_snapshot + calls list.extend(batch) on each yielded value and then accesses + `.id` on every retained item (see + api/db/services/connector_service.cleanup_stale_documents_for_task). + Yielding SlimDocument batches matches both contracts. + """ + if not self._access_token: + raise ConnectorMissingCredentialError("OneDrive") + + batch: list[SlimDocument] = [] + for drive_id in self._list_drive_ids(): + url: str | None = self._delta_url(drive_id) + while url: + data = self._get_json(url, context=f"prune drive={drive_id}") + for item in data.get("value", []): + if "file" not in item or item.get("deleted"): + continue + item_id = item.get("id") + if not item_id: + continue + if callback: + callback(item_id, item.get("name", "")) + batch.append(SlimDocument(id=item_id)) + if len(batch) >= self.batch_size: + yield batch + batch = [] + url = data.get("@odata.nextLink") + if batch: + yield batch + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _get(self, url: str) -> requests.Response: + return requests.get( + url, + headers={"Authorization": f"Bearer {self._access_token}"}, + timeout=60, + ) + + def _get_json(self, url: str, *, context: str) -> dict: + """GET *url* and decode JSON. Raise on non-2xx so the caller never + treats a 429 / 5xx as an empty page and silently advances the + checkpoint past missing data. + """ + resp = self._get(url) + if not resp.ok: + body_snippet = resp.text[:200] if resp.text else "" + logger.error( + "OneDrive Graph request failed (%s): HTTP %s url=%s body=%s", + context, + resp.status_code, + url, + body_snippet, + ) + raise UnexpectedValidationError( + f"OneDrive Graph request failed ({context}): HTTP {resp.status_code} {body_snippet}" + ) + try: + return resp.json() + except ValueError as exc: + raise UnexpectedValidationError( + f"OneDrive Graph response is not JSON ({context}): {exc}" + ) + + def _list_drive_ids(self) -> list[str]: + """Return all drive IDs visible to the service principal.""" + ids: list[str] = [] + url: str | None = f"{_GRAPH_BASE}/drives" + while url: + data = self._get_json(url, context="list drives") + ids.extend(d["id"] for d in data.get("value", []) if d.get("id")) + url = data.get("@odata.nextLink") + return ids + + def _delta_url(self, drive_id: str, delta_link: str | None = None) -> str: + if delta_link: + return delta_link + base = f"{_GRAPH_BASE}/drives/{drive_id}/root/delta" + if self.folder_path: + # Use /drive/root:/{path}:/delta for scoped delta + base = f"{_GRAPH_BASE}/drives/{drive_id}/root:{self.folder_path}:/delta" + return base + + def _iter_documents( + self, + checkpoint: OneDriveCheckpoint | None = None, + since_epoch: float | None = None, + ): + """ + Generator that yields batches of Document objects. + + Uses Graph delta queries. When *checkpoint* is supplied its + delta links are used; otherwise a full crawl is performed. + """ + from datetime import datetime, timezone + + from common.data_source.models import Document + + delta_links: dict[str, str] = {} + if checkpoint and checkpoint.delta_links: + delta_links = dict(checkpoint.delta_links) + + batch: list[Document] = [] + + for drive_id in self._list_drive_ids(): + start_url = self._delta_url(drive_id, delta_links.get(drive_id)) + url: str | None = start_url + next_delta: str | None = None + + while url: + data = self._get_json(url, context=f"delta drive={drive_id}") + + for item in data.get("value", []): + # Skip folders and deleted items + if "file" not in item or item.get("deleted"): + continue + + name: str = item.get("name", "") + ext = "." + name.rsplit(".", 1)[-1].lower() if "." in name else "" + if ext not in _SUPPORTED_EXTENSIONS: + continue + + modified_str: str = item.get("lastModifiedDateTime", "") + modified_ts: float | None = None + if modified_str: + try: + dt = datetime.fromisoformat( + modified_str.replace("Z", "+00:00") + ) + modified_ts = dt.timestamp() + except ValueError: + pass + + # For poll_source: skip items outside the time window + if since_epoch and modified_ts and modified_ts < since_epoch: + continue + + doc_updated_at = ( + datetime.fromtimestamp(modified_ts, tz=timezone.utc) + if modified_ts + else datetime.now(timezone.utc) + ) + doc = Document( + id=item["id"], + source="onedrive", + semantic_identifier=name, + extension=ext, + blob=b"", + doc_updated_at=doc_updated_at, + size_bytes=int(item.get("size", 0) or 0), + metadata={ + "drive_id": drive_id, + "web_url": item.get("webUrl", ""), + "created_by": ( + item.get("createdBy", {}) + .get("user", {}) + .get("displayName", "") + ), + }, + ) + batch.append(doc) + if len(batch) >= self.batch_size: + yield batch + batch = [] + + next_delta = data.get("@odata.deltaLink") + url = data.get("@odata.nextLink") + + if next_delta: + delta_links[drive_id] = next_delta + + if batch: + yield batch + + # Update checkpoint + if checkpoint is not None: + checkpoint.delta_links = delta_links + checkpoint.has_more = False diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 2283458f03..9ce2924712 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -61,6 +61,7 @@ from common.data_source import ( RDBMSConnector, DingTalkAITableConnector, RestAPIConnector, + OneDriveConnector, TeamsConnector, SlackConnector, SharePointConnector, @@ -997,6 +998,52 @@ class SharePoint(SyncBase): return document_batches() +class OneDrive(SyncBase): + SOURCE_NAME: str = FileSource.ONEDRIVE + + async def _generate(self, task: dict): + raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE) + try: + batch_size = int(raw_batch_size) + except (TypeError, ValueError): + batch_size = INDEX_BATCH_SIZE + if batch_size <= 0: + batch_size = INDEX_BATCH_SIZE + + self.connector = OneDriveConnector( + batch_size=batch_size, + folder_path=self.conf.get("folder_path") or None, + ) + self.connector.load_credentials(self.conf["credentials"]) + + # Always route through load_from_checkpoint so the connector owns the + # delta-link bookkeeping; incremental runs pass the previous poll + # range start as the lastModifiedDateTime floor while the same delta + # walk drives both modes. poll_source disregarded the checkpoint + # entirely, which would have re-walked every drive's root each run. + if task["reindex"] == "1" or not task["poll_range_start"]: + start_ts = 0.0 + else: + start_ts = task["poll_range_start"].timestamp() + end_ts = datetime.now(timezone.utc).timestamp() + checkpoint = self.connector.build_dummy_checkpoint() + document_batch_generator = self.connector.load_from_checkpoint( + start_ts, end_ts, checkpoint + ) + + self.log_connection( + "OneDrive", + self.conf.get("folder_path", "/") or "/", + task, + ) + + def wrapper(): + for document_batch in document_batch_generator: + yield document_batch + + return wrapper() + + class Slack(SyncBase): SOURCE_NAME: str = FileSource.SLACK @@ -1857,6 +1904,7 @@ func_factory = { FileSource.GOOGLE_DRIVE: GoogleDrive, FileSource.JIRA: Jira, FileSource.SHAREPOINT: SharePoint, + FileSource.ONEDRIVE: OneDrive, FileSource.SLACK: Slack, FileSource.TEAMS: Teams, FileSource.MOODLE: Moodle, diff --git a/test/unit_test/data_source/test_onedrive_connector_unit.py b/test/unit_test/data_source/test_onedrive_connector_unit.py new file mode 100644 index 0000000000..7dfed16993 --- /dev/null +++ b/test/unit_test/data_source/test_onedrive_connector_unit.py @@ -0,0 +1,396 @@ +"""Unit tests for OneDriveConnector.""" + +import pytest +from unittest.mock import MagicMock, patch + +from common.data_source.onedrive_connector import OneDriveConnector, OneDriveCheckpoint +from common.data_source.models import SlimDocument +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + InsufficientPermissionsError, + UnexpectedValidationError, +) + + +_GOOD_CREDS = { + "tenant_id": "tenant-123", + "client_id": "client-abc", + "client_secret": "secret-xyz", +} + + +# --------------------------------------------------------------------------- +# load_credentials +# --------------------------------------------------------------------------- + +@pytest.mark.p2 +def test_load_credentials_missing_fields_raises(): + connector = OneDriveConnector() + with pytest.raises(ConnectorMissingCredentialError): + connector.load_credentials({"tenant_id": "t", "client_id": "c"}) # missing secret + + +@pytest.mark.p1 +def test_load_credentials_success(): + connector = OneDriveConnector() + mock_app = MagicMock() + mock_app.acquire_token_for_client.return_value = {"access_token": "tok-abc"} + + with patch("common.data_source.onedrive_connector.msal.ConfidentialClientApplication", return_value=mock_app): + result = connector.load_credentials(_GOOD_CREDS) + + assert result is None + assert connector._access_token == "tok-abc" + assert connector._tenant_id == "tenant-123" + + +@pytest.mark.p2 +def test_load_credentials_msal_failure_raises(): + connector = OneDriveConnector() + mock_app = MagicMock() + mock_app.acquire_token_for_client.return_value = { + "error": "invalid_client", + "error_description": "AADSTS70011", + } + + with patch("common.data_source.onedrive_connector.msal.ConfidentialClientApplication", return_value=mock_app): + with pytest.raises(ConnectorMissingCredentialError, match="AADSTS70011"): + connector.load_credentials(_GOOD_CREDS) + + +# --------------------------------------------------------------------------- +# validate_connector_settings +# --------------------------------------------------------------------------- + +@pytest.mark.p2 +def test_validate_without_credentials_raises(): + connector = OneDriveConnector() + with pytest.raises(ConnectorMissingCredentialError): + connector.validate_connector_settings() + + +@pytest.mark.p1 +def test_validate_success(): + connector = OneDriveConnector() + connector._access_token = "tok" + + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.ok = True + mock_resp.json.return_value = {"value": [{"id": "drive-1"}]} + + with patch.object(connector, "_get", return_value=mock_resp): + connector.validate_connector_settings() # should not raise + + +@pytest.mark.p2 +def test_validate_401_raises_missing_credential(): + connector = OneDriveConnector() + connector._access_token = "expired" + + mock_resp = MagicMock() + mock_resp.status_code = 401 + mock_resp.ok = False + + with patch.object(connector, "_get", return_value=mock_resp): + with pytest.raises(ConnectorMissingCredentialError): + connector.validate_connector_settings() + + +@pytest.mark.p2 +def test_validate_403_raises_insufficient_permissions(): + connector = OneDriveConnector() + connector._access_token = "tok" + + mock_resp = MagicMock() + mock_resp.status_code = 403 + mock_resp.ok = False + + with patch.object(connector, "_get", return_value=mock_resp): + with pytest.raises(InsufficientPermissionsError): + connector.validate_connector_settings() + + +@pytest.mark.p2 +def test_validate_unexpected_status_raises(): + connector = OneDriveConnector() + connector._access_token = "tok" + + mock_resp = MagicMock() + mock_resp.status_code = 500 + mock_resp.ok = False + mock_resp.text = "internal error" + + with patch.object(connector, "_get", return_value=mock_resp): + with pytest.raises(UnexpectedValidationError): + connector.validate_connector_settings() + + +# --------------------------------------------------------------------------- +# Checkpoint helpers +# --------------------------------------------------------------------------- + +@pytest.mark.p2 +def test_build_dummy_checkpoint(): + connector = OneDriveConnector() + ckpt = connector.build_dummy_checkpoint() + assert isinstance(ckpt, OneDriveCheckpoint) + assert ckpt.has_more is True + assert ckpt.delta_links == {} + + +@pytest.mark.p2 +def test_validate_checkpoint_json_invalid_returns_dummy(): + connector = OneDriveConnector() + ckpt = connector.validate_checkpoint_json("not-json") + assert isinstance(ckpt, OneDriveCheckpoint) + + +# --------------------------------------------------------------------------- +# _iter_documents (via poll_source) +# --------------------------------------------------------------------------- + +def _ok(json_value): + """Tiny helper: build a successful MagicMock response with .ok / .json().""" + resp = MagicMock() + resp.ok = True + resp.status_code = 200 + resp.json.return_value = json_value + return resp + + +def _err(status: int, text: str = ""): + """Tiny helper: build a non-ok MagicMock response for failure tests.""" + resp = MagicMock() + resp.ok = False + resp.status_code = status + resp.text = text + return resp + + +@pytest.mark.p1 +def test_poll_source_yields_supported_files(): + connector = OneDriveConnector(batch_size=10) + connector._access_token = "tok" + + drives_resp = _ok({"value": [{"id": "drive-1"}]}) + delta_resp = _ok({ + "value": [ + { + "id": "file-1", + "name": "report.docx", + "file": {}, + "lastModifiedDateTime": "2026-05-20T10:00:00Z", + "webUrl": "https://example.com/report.docx", + "size": 1024, + "createdBy": {"user": {"displayName": "Alice"}}, + } + ], + "@odata.deltaLink": "https://graph.microsoft.com/delta-link", + }) + + with patch.object(connector, "_get", side_effect=[drives_resp, delta_resp]): + batches = list(connector.poll_source(0.0, 9999999999.0)) + + assert len(batches) == 1 + assert len(batches[0]) == 1 + assert batches[0][0].semantic_identifier == "report.docx" + + +@pytest.mark.p2 +def test_poll_source_skips_unsupported_extensions(): + connector = OneDriveConnector(batch_size=10) + connector._access_token = "tok" + + drives_resp = _ok({"value": [{"id": "drive-1"}]}) + delta_resp = _ok({ + "value": [ + { + "id": "img-1", + "name": "photo.png", # not in _SUPPORTED_EXTENSIONS + "file": {}, + "lastModifiedDateTime": "2026-05-20T10:00:00Z", + "webUrl": "https://example.com/photo.png", + "size": 512, + } + ], + }) + + with patch.object(connector, "_get", side_effect=[drives_resp, delta_resp]): + batches = list(connector.poll_source(0.0, 9999999999.0)) + + assert batches == [] + + +@pytest.mark.p2 +def test_poll_source_skips_deleted_items(): + connector = OneDriveConnector(batch_size=10) + connector._access_token = "tok" + + drives_resp = _ok({"value": [{"id": "drive-1"}]}) + delta_resp = _ok({ + "value": [ + { + "id": "file-del", + "name": "gone.docx", + "file": {}, + "deleted": {"state": "deleted"}, + } + ], + }) + + with patch.object(connector, "_get", side_effect=[drives_resp, delta_resp]): + batches = list(connector.poll_source(0.0, 9999999999.0)) + + assert batches == [] + + +# --------------------------------------------------------------------------- +# Non-2xx Graph responses must raise (no silent partial syncs) +# --------------------------------------------------------------------------- + +@pytest.mark.p1 +def test_iter_documents_raises_on_graph_http_500(): + """A 500 from the delta endpoint must surface — silently breaking would + advance the checkpoint past data we never saw.""" + connector = OneDriveConnector(batch_size=10) + connector._access_token = "tok" + + drives_resp = _ok({"value": [{"id": "drive-1"}]}) + delta_resp = _err(500, "internal error") + + with patch.object(connector, "_get", side_effect=[drives_resp, delta_resp]): + with pytest.raises(UnexpectedValidationError): + list(connector.poll_source(0.0, 9999999999.0)) + + +@pytest.mark.p1 +def test_iter_documents_raises_on_graph_http_429(): + """Throttling must propagate so the orchestrator retries instead of + treating the run as a clean empty sync.""" + connector = OneDriveConnector(batch_size=10) + connector._access_token = "tok" + + drives_resp = _ok({"value": [{"id": "drive-1"}]}) + throttled = _err(429, "Too Many Requests") + + with patch.object(connector, "_get", side_effect=[drives_resp, throttled]): + with pytest.raises(UnexpectedValidationError): + list(connector.poll_source(0.0, 9999999999.0)) + + +@pytest.mark.p1 +def test_list_drive_ids_raises_on_http_error(): + connector = OneDriveConnector() + connector._access_token = "tok" + + with patch.object(connector, "_get", side_effect=[_err(503, "unavailable")]): + with pytest.raises(UnexpectedValidationError): + connector._list_drive_ids() + + +# --------------------------------------------------------------------------- +# retrieve_all_slim_docs_perm_sync: yields SlimDocument batches for prune +# --------------------------------------------------------------------------- + +@pytest.mark.p1 +def test_retrieve_slim_docs_yields_slimdocument_batches(): + """The prune collector does file_list.extend(batch) and reads .id on each + retained item, so retrieve_all_slim_docs_perm_sync must yield lists of + SlimDocument (not lists of plain dicts and not bare dicts).""" + connector = OneDriveConnector(batch_size=2) + connector._access_token = "tok" + + drives_resp = _ok({"value": [{"id": "drive-1"}]}) + delta_resp = _ok({ + "value": [ + {"id": "f1", "name": "a.docx", "file": {}}, + {"id": "f2", "name": "b.pdf", "file": {}}, + {"id": "f3", "name": "c.txt", "file": {}}, + ], + }) + + with patch.object(connector, "_get", side_effect=[drives_resp, delta_resp]): + batches = list(connector.retrieve_all_slim_docs_perm_sync()) + + # batch_size=2 -> first batch has 2 items, second has the trailing one + assert len(batches) == 2 + assert [len(b) for b in batches] == [2, 1] + flat = [item for batch in batches for item in batch] + assert all(isinstance(item, SlimDocument) for item in flat) + assert {item.id for item in flat} == {"f1", "f2", "f3"} + + +@pytest.mark.p2 +def test_retrieve_slim_docs_skips_folders_and_deleted(): + connector = OneDriveConnector(batch_size=10) + connector._access_token = "tok" + + drives_resp = _ok({"value": [{"id": "drive-1"}]}) + delta_resp = _ok({ + "value": [ + {"id": "folder-1", "name": "Docs", "folder": {}}, # folder, no "file" + {"id": "del-1", "name": "gone.pdf", "file": {}, "deleted": {"state": "deleted"}}, + {"id": "ok-1", "name": "keep.pdf", "file": {}}, + ], + }) + + with patch.object(connector, "_get", side_effect=[drives_resp, delta_resp]): + batches = list(connector.retrieve_all_slim_docs_perm_sync()) + + flat = [item for batch in batches for item in batch] + assert [item.id for item in flat] == ["ok-1"] + + +@pytest.mark.p2 +def test_retrieve_slim_docs_raises_on_http_error(): + connector = OneDriveConnector(batch_size=10) + connector._access_token = "tok" + + drives_resp = _ok({"value": [{"id": "drive-1"}]}) + failed = _err(502, "bad gateway") + + with patch.object(connector, "_get", side_effect=[drives_resp, failed]): + with pytest.raises(UnexpectedValidationError): + list(connector.retrieve_all_slim_docs_perm_sync()) + + +@pytest.mark.p2 +def test_retrieve_slim_docs_requires_credentials(): + connector = OneDriveConnector() + # _access_token is None + with pytest.raises(ConnectorMissingCredentialError): + list(connector.retrieve_all_slim_docs_perm_sync()) + + +# --------------------------------------------------------------------------- +# load_from_checkpoint: resumes from delta_links and honors start floor +# --------------------------------------------------------------------------- + +@pytest.mark.p1 +def test_load_from_checkpoint_uses_persisted_delta_link(): + """When the checkpoint carries a delta_link for a drive, the connector + must hit THAT URL — not the drive root — so incremental runs resume + from where the previous one left off.""" + connector = OneDriveConnector(batch_size=10) + connector._access_token = "tok" + + saved_delta = "https://graph.microsoft.com/v1.0/drives/drive-1/root/delta?token=ABC" + ckpt = OneDriveCheckpoint(has_more=True, delta_links={"drive-1": saved_delta}) + + drives_resp = _ok({"value": [{"id": "drive-1"}]}) + delta_resp = _ok({"value": [], "@odata.deltaLink": "next-link"}) + + visited: list[str] = [] + + def _stub_get(url): + visited.append(url) + return drives_resp if url.endswith("/drives") else delta_resp + + with patch.object(connector, "_get", side_effect=_stub_get): + list(connector.load_from_checkpoint(0.0, 0.0, ckpt)) + + # Second call (after /drives) is the delta fetch — it must be the saved + # delta link, not the drive root. + assert visited[1] == saved_delta + assert ckpt.delta_links == {"drive-1": "next-link"} diff --git a/web/src/assets/svg/data-source/onedrive.svg b/web/src/assets/svg/data-source/onedrive.svg new file mode 100644 index 0000000000..6b1b0a2088 --- /dev/null +++ b/web/src/assets/svg/data-source/onedrive.svg @@ -0,0 +1,6 @@ + + + + + + diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index 82649c7309..4b38f1acbf 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -1385,6 +1385,16 @@ Example: Virtual Hosted Style`, 'Datetime/timestamp column for incremental sync. Only rows modified after the last sync will be fetched.', rest_apiDescription: 'Connect any REST API endpoint as a data source using a flexible, configuration-driven connector.', + onedriveDescription: + 'Connect OneDrive or OneDrive for Business to index files and folders via Microsoft Graph delta queries.', + onedriveTenantIdTip: + 'Azure Active Directory tenant ID (Directory ID) of the Microsoft 365 organisation.', + onedriveClientIdTip: + 'Application (client) ID of the Azure AD app registration with Files.Read.All permission.', + onedriveClientSecretTip: + 'Client secret value generated in the Azure AD app registration.', + onedriveFolderPathTip: + 'Optional sub-folder path to limit indexing (e.g. /Documents/Reports). Leave blank to index the entire drive.', restApiQueryParamsTip: 'Key=value pairs (one per line) sent as URL query parameters. Use this instead of embedding params in the URL.', restApiHeadersTip: diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts index 454bece5da..7a0ec91f8f 100644 --- a/web/src/locales/zh.ts +++ b/web/src/locales/zh.ts @@ -1099,8 +1099,12 @@ NER:使用 spaCy NER 和基于规则的关键词提取来抽取实体和关系 gmailTokenTip: '请上传由 Google Console 生成的 OAuth JSON。如果仅包含 client credentials,请通过浏览器授权一次以获取长期有效的刷新 Token。', dropboxDescription: '连接 Dropbox,同步指定账号下的文件与文件夹。', - teamsDescription: - '通过 Microsoft Graph 连接 Microsoft Teams,同步频道帖子与回复。', + onedriveDescription: '连接 OneDrive 或 OneDrive for Business,通过 Microsoft Graph delta 查询索引文件和文件夹。', + onedriveTenantIdTip: 'Microsoft 365 组织的 Azure Active Directory 租户 ID(目录 ID)。', + onedriveClientIdTip: '拥有 Files.Read.All 权限的 Azure AD 应用注册的应用程序(客户端)ID。', + onedriveClientSecretTip: '在 Azure AD 应用注册中生成的客户端密钥值。', + onedriveFolderPathTip: '可选的子文件夹路径,用于限制索引范围(例如 /Documents/Reports)。留空则索引整个云盘。', + teamsDescription: '通过 Microsoft Graph 连接 Microsoft Teams,同步频道帖子与回复。', teamsTenantIdTip: 'Azure AD 租户 ID。需要具备 Team.ReadBasic.All 与 ChannelMessage.Read.All 应用权限(管理员同意)的应用。', slackDescription: '连接你的 Slack 工作区,同步频道消息与讨论串。', diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 6f55db8da1..2938b6ee47 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -43,6 +43,7 @@ export enum DataSourceKey { POSTGRESQL = 'postgresql', REST_API = 'rest_api', RSS = 'rss', + ONEDRIVE = 'onedrive', TEAMS = 'teams', SLACK = 'slack', SHAREPOINT = 'sharepoint', @@ -129,6 +130,9 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.MOODLE]: { syncDeletedFiles: true, }, + [DataSourceKey.ONEDRIVE]: { + syncDeletedFiles: true, + }, [DataSourceKey.TEAMS]: { syncDeletedFiles: true, }, @@ -317,6 +321,11 @@ export const generateDataSourceInfo = (t: TFunction) => { description: t(`setting.${DataSourceKey.POSTGRESQL}Description`), icon: , }, + [DataSourceKey.ONEDRIVE]: { + name: 'OneDrive', + description: t(`setting.${DataSourceKey.ONEDRIVE}Description`), + icon: , + }, }; }; @@ -403,6 +412,49 @@ export const getCommonExtraDefaultValues = () => ({ }); export const DataSourceFormFields = { + [DataSourceKey.ONEDRIVE]: [ + { + label: 'Tenant ID', + name: 'config.credentials.tenant_id', + type: FormFieldType.Text, + required: true, + placeholder: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx', + tooltip: t('setting.onedriveTenantIdTip'), + }, + { + label: 'Client ID', + name: 'config.credentials.client_id', + type: FormFieldType.Text, + required: true, + placeholder: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx', + tooltip: t('setting.onedriveClientIdTip'), + }, + { + label: 'Client Secret', + name: 'config.credentials.client_secret', + type: FormFieldType.Password, + required: true, + tooltip: t('setting.onedriveClientSecretTip'), + }, + { + label: 'Folder Path (optional)', + name: 'config.folder_path', + type: FormFieldType.Text, + required: false, + placeholder: '/Documents/Reports', + tooltip: t('setting.onedriveFolderPathTip'), + }, + { + label: 'Batch Size', + name: 'config.batch_size', + type: FormFieldType.Number, + required: false, + validation: { + min: 1, + message: 'Batch Size must be at least 1', + }, + }, + ], [DataSourceKey.RSS]: [ { label: 'Feed URL', @@ -1843,6 +1895,19 @@ export const DataSourceFormDefaultValues = { }, }, }, + [DataSourceKey.ONEDRIVE]: { + name: '', + source: DataSourceKey.ONEDRIVE, + config: { + folder_path: '', + batch_size: 2, + credentials: { + tenant_id: '', + client_id: '', + client_secret: '', + }, + }, + }, [DataSourceKey.REST_API]: { name: '', source: DataSourceKey.REST_API,