diff --git a/common/constants.py b/common/constants.py index 60b09aa8bb..c98794e5e9 100644 --- a/common/constants.py +++ b/common/constants.py @@ -155,6 +155,7 @@ class FileSource(StrEnum): POSTGRESQL = "postgresql" DINGTALK_AI_TABLE = "dingtalk_ai_table" ONEDRIVE = "onedrive" + OUTLOOK = "outlook" class PipelineTaskType(StrEnum): diff --git a/common/data_source/__init__.py b/common/data_source/__init__.py index 2a3682747d..bfd365e095 100644 --- a/common/data_source/__init__.py +++ b/common/data_source/__init__.py @@ -35,6 +35,7 @@ from .google_drive.connector import GoogleDriveConnector from .jira.connector import JiraConnector from .sharepoint_connector import SharePointConnector from .onedrive_connector import OneDriveConnector +from .outlook_connector import OutlookConnector from .teams_connector import TeamsConnector from .moodle_connector import MoodleConnector from .airtable_connector import AirtableConnector @@ -69,6 +70,7 @@ __all__ = [ "JiraConnector", "SharePointConnector", "OneDriveConnector", + "OutlookConnector", "TeamsConnector", "MoodleConnector", "BlobType", diff --git a/common/data_source/config.py b/common/data_source/config.py index c445844b70..d0238c505e 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -70,6 +70,7 @@ class DocumentSource(str, Enum): POSTGRESQL = "postgresql" DINGTALK_AI_TABLE = "dingtalk_ai_table" ONEDRIVE = "onedrive" + OUTLOOK = "outlook" class FileOrigin(str, Enum): diff --git a/common/data_source/outlook_connector.py b/common/data_source/outlook_connector.py new file mode 100644 index 0000000000..395f03c31a --- /dev/null +++ b/common/data_source/outlook_connector.py @@ -0,0 +1,482 @@ +"""Outlook / Microsoft 365 mail data source connector""" + +import logging +from datetime import datetime, timezone +from typing import Any, Generator + +import msal +import requests + +from common.data_source.config import INDEX_BATCH_SIZE +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + ConnectorValidationError, + InsufficientPermissionsError, + UnexpectedValidationError, +) +from common.data_source.interfaces import ( + CheckpointedConnectorWithPermSync, + SecondsSinceUnixEpoch, + SlimConnectorWithPermSync, +) +from common.data_source.models import ( + BasicExpertInfo, + ConnectorCheckpoint, + Document, + SlimDocument, +) + +logger = logging.getLogger(__name__) + +_GRAPH_BASE = "https://graph.microsoft.com/v1.0" +_GRAPH_SCOPE = ["https://graph.microsoft.com/.default"] + +# Default folder when none specified; "inbox" is a well-known folder ID. +_DEFAULT_FOLDER = "inbox" + + +def _redact(value: str | None) -> str: + """Return a privacy-preserving representation of a UPN / email / object id. + + Used for log lines so a single failure trace doesn't leak the entire + list of mailbox owners. The first two characters of the local part are + preserved as a debugging hint; the rest of the local part and the + domain are masked. For non-email values (GUIDs, object IDs) we keep + the first 4 chars to disambiguate which mailbox failed. + """ + if not value: + return "" + if "@" in value: + local, _, domain = value.partition("@") + if len(local) <= 2: + local_mask = local + else: + local_mask = local[:2] + "***" + return f"{local_mask}@***" + return f"{value[:4]}***" if len(value) > 4 else "***" + + +class OutlookCheckpoint(ConnectorCheckpoint): + """Outlook-specific checkpoint tracking delta links per user mailbox.""" + delta_links: dict[str, str] | None = None + + +def _strip_html(html: str) -> str: + """Tiny HTML-to-text fallback. Avoids pulling in BeautifulSoup just for this.""" + if not html: + return "" + text = html + # remove script/style blocks crudely + for tag in ("script", "style"): + while True: + start = text.lower().find(f"<{tag}") + if start == -1: + break + end = text.lower().find(f"", start) + if end == -1: + text = text[:start] + break + text = text[:start] + text[end + len(tag) + 3 :] + # drop remaining tags + out: list[str] = [] + in_tag = False + for ch in text: + if ch == "<": + in_tag = True + continue + if ch == ">": + in_tag = False + continue + if not in_tag: + out.append(ch) + return "".join(out).strip() + + +class OutlookConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync): + """ + Outlook / Microsoft 365 mail connector. + + Uses Microsoft Graph delta queries against + `/users/{id}/mailFolders/{folder}/messages/delta`, persisting per-user + delta links so incremental syncs only fetch changed messages. + + Required Azure AD application permission: + - Mail.Read + - User.Read.All (only needed when no explicit user_ids are provided, + so the connector can enumerate mailboxes) + """ + + def __init__( + self, + batch_size: int = INDEX_BATCH_SIZE, + folder: str = _DEFAULT_FOLDER, + user_ids: list[str] | None = None, + ) -> None: + self.batch_size = batch_size + self.folder = folder or _DEFAULT_FOLDER + # Optional list of UPNs / object IDs to limit which mailboxes are synced. + self.user_ids = user_ids or [] + self._access_token: str | None = None + self._tenant_id: str | None = None + + # ------------------------------------------------------------------ + # Auth + # ------------------------------------------------------------------ + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + tenant_id = credentials.get("tenant_id") + client_id = credentials.get("client_id") + client_secret = credentials.get("client_secret") + + if not all([tenant_id, client_id, client_secret]): + raise ConnectorMissingCredentialError( + "Outlook credentials are incomplete (tenant_id, client_id, " + "client_secret required)" + ) + + self._tenant_id = tenant_id + + app = msal.ConfidentialClientApplication( + client_id=client_id, + client_credential=client_secret, + authority=f"https://login.microsoftonline.com/{tenant_id}", + ) + result = app.acquire_token_for_client(scopes=_GRAPH_SCOPE) + + if "access_token" not in result: + error = result.get("error_description", result.get("error", "unknown")) + raise ConnectorMissingCredentialError( + f"Failed to acquire Outlook access token: {error}" + ) + + self._access_token = result["access_token"] + return None + + # ------------------------------------------------------------------ + # Validation + # ------------------------------------------------------------------ + + def validate_connector_settings(self) -> None: + if not self._access_token: + raise ConnectorMissingCredentialError("Outlook") + + # Probe: list one user (or check explicit user mailbox). + probe_url = ( + f"{_GRAPH_BASE}/users/{self.user_ids[0]}" + if self.user_ids + else f"{_GRAPH_BASE}/users?$top=1" + ) + resp = self._get(probe_url) + + if resp.status_code == 401: + raise ConnectorMissingCredentialError( + "Outlook access token is invalid or expired." + ) + if resp.status_code == 403: + raise InsufficientPermissionsError( + "The service principal lacks the 'Mail.Read' (and possibly " + "'User.Read.All') permission required by the Outlook connector." + ) + if resp.status_code == 404 and self.user_ids: + raise ConnectorValidationError( + f"Configured Outlook mailbox '{self.user_ids[0]}' does not exist " + "in this tenant." + ) + if not resp.ok: + raise UnexpectedValidationError( + f"Outlook validation failed (HTTP {resp.status_code}): " + f"{resp.text[:200]}" + ) + + # ------------------------------------------------------------------ + # Checkpoint helpers + # ------------------------------------------------------------------ + + def build_dummy_checkpoint(self) -> OutlookCheckpoint: + return OutlookCheckpoint(has_more=True, delta_links={}) + + def validate_checkpoint_json(self, checkpoint_json: str) -> OutlookCheckpoint: + try: + return OutlookCheckpoint.model_validate_json(checkpoint_json) + except Exception: + return self.build_dummy_checkpoint() + + # ------------------------------------------------------------------ + # Core data loading + # ------------------------------------------------------------------ + + def poll_source( + self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch + ) -> Any: + """Return messages received at or after *start* (epoch seconds). + + Kept for callers that prefer the time-window interface; internally + defers to the same delta-walk used by load_from_checkpoint. + """ + return self._iter_documents(since_epoch=start) + + def load_from_checkpoint( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ConnectorCheckpoint, + ) -> Any: + """Resume from *checkpoint*'s delta_links and apply the start floor. + + The delta_links map carries per-user @odata.deltaLink values from the + previous run; when present the walk resumes from those links instead + of crawling each mailbox from the root, which is what makes + incremental syncs cheap. The start_time is still applied as a + receivedDateTime floor so callers that pass a window (and have no + persisted delta link yet) don't re-process everything. + """ + if not isinstance(checkpoint, OutlookCheckpoint): + checkpoint = self.build_dummy_checkpoint() + since = start if start else None + return self._iter_documents(checkpoint=checkpoint, since_epoch=since) + + def load_from_checkpoint_with_perm_sync( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ConnectorCheckpoint, + ) -> Any: + return self.load_from_checkpoint(start, end, checkpoint) + + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> Generator[list[SlimDocument], None, None]: + """Yield batches of slim documents for prune / permission sync. + + The prune collector in rag/svr/sync_data_source._collect_prune_snapshot + does file_list.extend(batch) and then + cleanup_stale_documents_for_task reads `.id` on every retained item + (api/db/services/connector_service.py:174). Yielding plain dicts + appended dict keys to file_list and then failed attribute access; + yielding list[SlimDocument] honors both contracts. + """ + if not self._access_token: + raise ConnectorMissingCredentialError("Outlook") + + batch: list[SlimDocument] = [] + for user_id in self._list_user_ids(): + url: str | None = self._delta_url(user_id) + while url: + data = self._get_json(url, context=f"prune user={_redact(user_id)}") + for msg in data.get("value", []): + if msg.get("@removed"): + continue + msg_id = msg.get("id") + if not msg_id: + continue + if callback: + callback(msg_id, msg.get("subject", "")) + batch.append(SlimDocument(id=msg_id)) + if len(batch) >= self.batch_size: + yield batch + batch = [] + url = data.get("@odata.nextLink") + if batch: + yield batch + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _get(self, url: str) -> requests.Response: + return requests.get( + url, + headers={"Authorization": f"Bearer {self._access_token}"}, + timeout=60, + ) + + def _get_json(self, url: str, *, context: str) -> dict: + """GET *url* and decode JSON. Raise on non-2xx so the caller never + treats a 429 / 5xx as an empty page and silently advances the + checkpoint past missing data. + """ + resp = self._get(url) + if not resp.ok: + body_snippet = resp.text[:200] if resp.text else "" + logger.error( + "Outlook Graph request failed (%s): HTTP %s body=%s", + context, + resp.status_code, + body_snippet, + ) + raise UnexpectedValidationError( + f"Outlook Graph request failed ({context}): " + f"HTTP {resp.status_code} {body_snippet}" + ) + try: + return resp.json() + except ValueError as exc: + raise UnexpectedValidationError( + f"Outlook Graph response is not JSON ({context}): {exc}" + ) + + def _list_user_ids(self) -> list[str]: + """Return mailbox identifiers to sync.""" + if self.user_ids: + return list(self.user_ids) + + ids: list[str] = [] + url: str | None = f"{_GRAPH_BASE}/users?$select=id,userPrincipalName,mail" + while url: + data = self._get_json(url, context="list users") + for user in data.get("value", []): + # Skip users with no mailbox provisioned. + if user.get("mail") or user.get("userPrincipalName"): + ids.append(user["id"]) + url = data.get("@odata.nextLink") + return ids + + def _delta_url(self, user_id: str, delta_link: str | None = None) -> str: + if delta_link: + return delta_link + return ( + f"{_GRAPH_BASE}/users/{user_id}/mailFolders/" + f"{self.folder}/messages/delta" + ) + + def _message_to_document( + self, msg: dict[str, Any], user_id: str + ) -> Document | None: + subject: str = msg.get("subject") or "(no subject)" + + body_obj = msg.get("body") or {} + body_content_type: str = body_obj.get("contentType", "text").lower() + body_content: str = body_obj.get("content") or "" + if body_content_type == "html": + body_text = _strip_html(body_content) + else: + body_text = body_content + + received_str: str = msg.get("receivedDateTime") or "" + received_dt: datetime | None = None + if received_str: + try: + received_dt = datetime.fromisoformat( + received_str.replace("Z", "+00:00") + ) + except ValueError: + pass + + from_addr = ( + msg.get("from", {}).get("emailAddress", {}) if msg.get("from") else {} + ) + to_recipients: list[str] = [ + r.get("emailAddress", {}).get("address", "") + for r in (msg.get("toRecipients") or []) + if r.get("emailAddress", {}).get("address") + ] + cc_recipients: list[str] = [ + r.get("emailAddress", {}).get("address", "") + for r in (msg.get("ccRecipients") or []) + if r.get("emailAddress", {}).get("address") + ] + + header_lines = [ + f"From: {from_addr.get('name', '')} <{from_addr.get('address', '')}>", + f"To: {', '.join(to_recipients)}", + ] + if cc_recipients: + header_lines.append(f"Cc: {', '.join(cc_recipients)}") + header_lines.append(f"Subject: {subject}") + + section_text = "\n".join(header_lines) + "\n\n" + body_text + + primary_owners: list[BasicExpertInfo] = [] + if from_addr.get("address"): + primary_owners.append( + BasicExpertInfo( + email=from_addr["address"], + display_name=from_addr.get("name") or None, + ) + ) + + blob = section_text.encode("utf-8") + return Document( + id=msg["id"], + source="outlook", + semantic_identifier=subject, + extension=".html" if body_content_type == "html" else ".txt", + blob=blob, + doc_updated_at=received_dt or datetime.now(timezone.utc), + size_bytes=len(blob), + primary_owners=primary_owners or None, + metadata={ + "user_id": user_id, + "folder": self.folder, + "from": from_addr.get("address", ""), + "to": ",".join(to_recipients), + "cc": ",".join(cc_recipients), + "has_attachments": str(bool(msg.get("hasAttachments"))), + "conversation_id": msg.get("conversationId", ""), + "web_link": msg.get("webLink", ""), + }, + ) + + def _iter_documents( + self, + checkpoint: OutlookCheckpoint | None = None, + since_epoch: float | None = None, + ): + """Generator that yields batches of Document objects.""" + delta_links: dict[str, str] = {} + if checkpoint and checkpoint.delta_links: + delta_links = dict(checkpoint.delta_links) + + batch: list[Document] = [] + + for user_id in self._list_user_ids(): + start_url = self._delta_url(user_id, delta_links.get(user_id)) + url: str | None = start_url + next_delta: str | None = None + + while url: + data = self._get_json( + url, context=f"delta user={_redact(user_id)}" + ) + + for msg in data.get("value", []): + # Skip removed/deleted messages signalled by delta semantics + if msg.get("@removed"): + continue + + received_str = msg.get("receivedDateTime") or "" + received_ts: float | None = None + if received_str: + try: + received_ts = datetime.fromisoformat( + received_str.replace("Z", "+00:00") + ).timestamp() + except ValueError: + pass + + if since_epoch and received_ts and received_ts < since_epoch: + continue + + doc = self._message_to_document(msg, user_id) + if doc is None: + continue + if doc.doc_updated_at is None: + doc.doc_updated_at = datetime.now(timezone.utc) + batch.append(doc) + if len(batch) >= self.batch_size: + yield batch + batch = [] + + next_delta = data.get("@odata.deltaLink") + url = data.get("@odata.nextLink") + + if next_delta: + delta_links[user_id] = next_delta + + if batch: + yield batch + + if checkpoint is not None: + checkpoint.delta_links = delta_links + checkpoint.has_more = False diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 9ce2924712..cfc86a2d27 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -62,6 +62,7 @@ from common.data_source import ( DingTalkAITableConnector, RestAPIConnector, OneDriveConnector, + OutlookConnector, TeamsConnector, SlackConnector, SharePointConnector, @@ -84,6 +85,23 @@ MAX_CONCURRENT_TASKS = int(os.environ.get("MAX_CONCURRENT_TASKS", "5")) task_limiter = asyncio.Semaphore(MAX_CONCURRENT_TASKS) +def _redact_mailbox(value: str) -> str: + """Return a privacy-preserving representation of a UPN / email / object id. + + Sync logs surface connector configuration verbatim, so leaking the + full mailbox list of a tenant is enough to inventory their org from + a single log file. Preserve the first two characters of the local + part as a debugging hint and mask the rest. + """ + if not value: + return "" + if "@" in value: + local, _, _domain = value.partition("@") + local_mask = local if len(local) <= 2 else local[:2] + "***" + return f"{local_mask}@***" + return f"{value[:4]}***" if len(value) > 4 else "***" + + class SyncBase: """ Base class for all data source synchronization connectors. @@ -1044,6 +1062,72 @@ class OneDrive(SyncBase): return wrapper() +class Outlook(SyncBase): + SOURCE_NAME: str = FileSource.OUTLOOK + + async def _generate(self, task: dict): + raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE) + try: + batch_size = int(raw_batch_size) + except (TypeError, ValueError): + batch_size = INDEX_BATCH_SIZE + if batch_size <= 0: + batch_size = INDEX_BATCH_SIZE + + raw_user_ids = self.conf.get("user_ids") + if isinstance(raw_user_ids, str): + user_ids = [u.strip() for u in raw_user_ids.split(",") if u.strip()] + elif isinstance(raw_user_ids, list): + user_ids = [str(u).strip() for u in raw_user_ids if str(u).strip()] + else: + user_ids = [] + + self.connector = OutlookConnector( + batch_size=batch_size, + folder=self.conf.get("folder") or "inbox", + user_ids=user_ids or None, + ) + self.connector.load_credentials(self.conf["credentials"]) + + # Always route through load_from_checkpoint so the connector owns the + # delta-link bookkeeping; incremental runs pass the previous poll + # range start as the receivedDateTime floor while the same delta + # walk drives both modes. poll_source disregarded the checkpoint + # entirely, which would have re-walked every mailbox each run. + if task["reindex"] == "1" or not task["poll_range_start"]: + start_ts = 0.0 + else: + start_ts = task["poll_range_start"].timestamp() + end_ts = datetime.now(timezone.utc).timestamp() + checkpoint = self.connector.build_dummy_checkpoint() + document_batch_generator = self.connector.load_from_checkpoint( + start_ts, end_ts, checkpoint + ) + + # Redact mailbox identifiers — full UPN / email lists in connector + # logs leak PII (the entire org's mail directory ends up in + # tail-of-logs output). Surface the folder, the count, and a small + # masked preview so operators can still spot a misconfigured run. + if user_ids: + preview = ",".join(_redact_mailbox(u) for u in user_ids[:3]) + if len(user_ids) > 3: + preview = f"{preview},+{len(user_ids) - 3} more" + details = "{}@{} users (preview: {})".format( + self.conf.get("folder", "inbox"), + len(user_ids), + preview, + ) + else: + details = "{}@".format(self.conf.get("folder", "inbox")) + self.log_connection("Outlook", details, task) + + def wrapper(): + for document_batch in document_batch_generator: + yield document_batch + + return wrapper() + + class Slack(SyncBase): SOURCE_NAME: str = FileSource.SLACK @@ -1905,6 +1989,7 @@ func_factory = { FileSource.JIRA: Jira, FileSource.SHAREPOINT: SharePoint, FileSource.ONEDRIVE: OneDrive, + FileSource.OUTLOOK: Outlook, FileSource.SLACK: Slack, FileSource.TEAMS: Teams, FileSource.MOODLE: Moodle, diff --git a/test/unit_test/data_source/test_outlook_connector_unit.py b/test/unit_test/data_source/test_outlook_connector_unit.py new file mode 100644 index 0000000000..37ebe23b7d --- /dev/null +++ b/test/unit_test/data_source/test_outlook_connector_unit.py @@ -0,0 +1,511 @@ +"""Unit tests for OutlookConnector.""" + +import pytest +from unittest.mock import MagicMock, patch + +from common.data_source.outlook_connector import ( + OutlookCheckpoint, + OutlookConnector, + _redact, + _strip_html, +) +from common.data_source.models import SlimDocument +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + ConnectorValidationError, + InsufficientPermissionsError, + UnexpectedValidationError, +) + + +_GOOD_CREDS = { + "tenant_id": "tenant-123", + "client_id": "client-abc", + "client_secret": "secret-xyz", +} + + +# --------------------------------------------------------------------------- +# _strip_html +# --------------------------------------------------------------------------- + +@pytest.mark.p3 +def test_strip_html_removes_tags_and_script(): + html = "

Hello world

" + assert "evil" not in _strip_html(html) + assert "Hello world" in _strip_html(html) + + +@pytest.mark.p3 +def test_strip_html_empty_returns_empty(): + assert _strip_html("") == "" + + +# --------------------------------------------------------------------------- +# load_credentials +# --------------------------------------------------------------------------- + +@pytest.mark.p2 +def test_load_credentials_missing_fields_raises(): + connector = OutlookConnector() + with pytest.raises(ConnectorMissingCredentialError): + connector.load_credentials({"tenant_id": "t", "client_id": "c"}) + + +@pytest.mark.p1 +def test_load_credentials_success(): + connector = OutlookConnector() + mock_app = MagicMock() + mock_app.acquire_token_for_client.return_value = {"access_token": "tok"} + + with patch( + "common.data_source.outlook_connector.msal.ConfidentialClientApplication", + return_value=mock_app, + ): + result = connector.load_credentials(_GOOD_CREDS) + + assert result is None + assert connector._access_token == "tok" + assert connector._tenant_id == "tenant-123" + + +@pytest.mark.p2 +def test_load_credentials_msal_failure_raises(): + connector = OutlookConnector() + mock_app = MagicMock() + mock_app.acquire_token_for_client.return_value = { + "error": "invalid_client", + "error_description": "AADSTS70011", + } + + with patch( + "common.data_source.outlook_connector.msal.ConfidentialClientApplication", + return_value=mock_app, + ): + with pytest.raises(ConnectorMissingCredentialError, match="AADSTS70011"): + connector.load_credentials(_GOOD_CREDS) + + +# --------------------------------------------------------------------------- +# validate_connector_settings +# --------------------------------------------------------------------------- + +@pytest.mark.p2 +def test_validate_without_credentials_raises(): + connector = OutlookConnector() + with pytest.raises(ConnectorMissingCredentialError): + connector.validate_connector_settings() + + +@pytest.mark.p1 +def test_validate_success_tenant_wide(): + connector = OutlookConnector() + connector._access_token = "tok" + + mock_resp = MagicMock(status_code=200, ok=True) + mock_resp.json.return_value = {"value": [{"id": "user-1"}]} + + with patch.object(connector, "_get", return_value=mock_resp) as mock_get: + connector.validate_connector_settings() + called_url = mock_get.call_args[0][0] + assert "/users?$top=1" in called_url + + +@pytest.mark.p1 +def test_validate_success_specific_user(): + connector = OutlookConnector(user_ids=["alice@example.com"]) + connector._access_token = "tok" + + mock_resp = MagicMock(status_code=200, ok=True) + mock_resp.json.return_value = {"id": "user-1"} + + with patch.object(connector, "_get", return_value=mock_resp) as mock_get: + connector.validate_connector_settings() + called_url = mock_get.call_args[0][0] + assert "alice@example.com" in called_url + + +@pytest.mark.p2 +def test_validate_401_raises_missing_credential(): + connector = OutlookConnector() + connector._access_token = "bad" + mock_resp = MagicMock(status_code=401, ok=False) + with patch.object(connector, "_get", return_value=mock_resp): + with pytest.raises(ConnectorMissingCredentialError): + connector.validate_connector_settings() + + +@pytest.mark.p2 +def test_validate_403_raises_insufficient_permissions(): + connector = OutlookConnector() + connector._access_token = "tok" + mock_resp = MagicMock(status_code=403, ok=False) + with patch.object(connector, "_get", return_value=mock_resp): + with pytest.raises(InsufficientPermissionsError): + connector.validate_connector_settings() + + +@pytest.mark.p2 +def test_validate_404_with_user_ids_raises_validation_error(): + connector = OutlookConnector(user_ids=["ghost@example.com"]) + connector._access_token = "tok" + mock_resp = MagicMock(status_code=404, ok=False) + with patch.object(connector, "_get", return_value=mock_resp): + with pytest.raises(ConnectorValidationError, match="ghost@example.com"): + connector.validate_connector_settings() + + +@pytest.mark.p2 +def test_validate_5xx_raises_unexpected(): + connector = OutlookConnector() + connector._access_token = "tok" + mock_resp = MagicMock(status_code=503, ok=False) + mock_resp.text = "service unavailable" + with patch.object(connector, "_get", return_value=mock_resp): + with pytest.raises(UnexpectedValidationError): + connector.validate_connector_settings() + + +# --------------------------------------------------------------------------- +# Checkpoint helpers +# --------------------------------------------------------------------------- + +@pytest.mark.p2 +def test_build_dummy_checkpoint(): + connector = OutlookConnector() + ckpt = connector.build_dummy_checkpoint() + assert isinstance(ckpt, OutlookCheckpoint) + assert ckpt.has_more is True + assert ckpt.delta_links == {} + + +@pytest.mark.p2 +def test_validate_checkpoint_json_invalid_returns_dummy(): + connector = OutlookConnector() + ckpt = connector.validate_checkpoint_json("garbage") + assert isinstance(ckpt, OutlookCheckpoint) + + +# --------------------------------------------------------------------------- +# _list_user_ids +# --------------------------------------------------------------------------- + +@pytest.mark.p2 +def test_list_user_ids_returns_configured_ids(): + connector = OutlookConnector(user_ids=["a@x.com", "b@x.com"]) + connector._access_token = "tok" + assert connector._list_user_ids() == ["a@x.com", "b@x.com"] + + +@pytest.mark.p2 +def test_list_user_ids_paginates_when_unset(): + connector = OutlookConnector() + connector._access_token = "tok" + + page_1 = MagicMock(ok=True) + page_1.json.return_value = { + "value": [ + {"id": "u1", "userPrincipalName": "u1@x.com", "mail": "u1@x.com"}, + {"id": "u2-no-mail"}, # filtered out (no mail, no UPN) + ], + "@odata.nextLink": "https://graph.example/next", + } + page_2 = MagicMock(ok=True) + page_2.json.return_value = { + "value": [{"id": "u3", "userPrincipalName": "u3@x.com"}], + } + + with patch.object(connector, "_get", side_effect=[page_1, page_2]): + ids = connector._list_user_ids() + assert ids == ["u1", "u3"] + + +# --------------------------------------------------------------------------- +# _iter_documents (via poll_source) +# --------------------------------------------------------------------------- + +@pytest.mark.p1 +def test_poll_source_yields_messages(): + connector = OutlookConnector( + batch_size=10, user_ids=["alice@example.com"] + ) + connector._access_token = "tok" + + delta_resp = MagicMock(ok=True) + delta_resp.json.return_value = { + "value": [ + { + "id": "msg-1", + "subject": "Hello", + "body": {"contentType": "text", "content": "Body text"}, + "receivedDateTime": "2026-05-20T10:00:00Z", + "webLink": "https://outlook.office.com/mail/1", + "from": { + "emailAddress": {"name": "Bob", "address": "bob@example.com"} + }, + "toRecipients": [ + {"emailAddress": {"address": "alice@example.com"}} + ], + "ccRecipients": [], + "hasAttachments": False, + "conversationId": "conv-1", + } + ], + "@odata.deltaLink": "https://graph.example/delta-1", + } + + with patch.object(connector, "_get", return_value=delta_resp): + batches = list(connector.poll_source(0.0, 9999999999.0)) + + assert len(batches) == 1 + doc = batches[0][0] + assert doc.semantic_identifier == "Hello" + body = doc.blob.decode("utf-8") + assert "Bob" in body + assert "Body text" in body + assert doc.metadata["conversation_id"] == "conv-1" + + +@pytest.mark.p2 +def test_poll_source_filters_old_messages(): + connector = OutlookConnector( + batch_size=10, user_ids=["alice@example.com"] + ) + connector._access_token = "tok" + + delta_resp = MagicMock(ok=True) + delta_resp.json.return_value = { + "value": [ + { + "id": "old-msg", + "subject": "old", + "body": {"contentType": "text", "content": "x"}, + "receivedDateTime": "2020-01-01T00:00:00Z", + } + ], + } + + with patch.object(connector, "_get", return_value=delta_resp): + # since_epoch in 2030 -> 2020 message is older, must be skipped + batches = list(connector.poll_source(1893456000.0, 9999999999.0)) + assert batches == [] + + +@pytest.mark.p2 +def test_poll_source_skips_removed_messages(): + connector = OutlookConnector( + batch_size=10, user_ids=["alice@example.com"] + ) + connector._access_token = "tok" + + delta_resp = MagicMock(ok=True) + delta_resp.json.return_value = { + "value": [ + {"id": "removed", "@removed": {"reason": "deleted"}}, + { + "id": "kept", + "subject": "kept", + "body": {"contentType": "text", "content": "y"}, + "receivedDateTime": "2026-05-20T10:00:00Z", + }, + ], + } + + with patch.object(connector, "_get", return_value=delta_resp): + batches = list(connector.poll_source(0.0, 9999999999.0)) + + ids = [d.id for batch in batches for d in batch] + assert ids == ["kept"] + + +@pytest.mark.p2 +def test_poll_source_html_body_is_stripped(): + connector = OutlookConnector( + batch_size=10, user_ids=["alice@example.com"] + ) + connector._access_token = "tok" + + delta_resp = MagicMock(ok=True) + delta_resp.json.return_value = { + "value": [ + { + "id": "html-msg", + "subject": "html", + "body": { + "contentType": "html", + "content": "

Hello world

", + }, + "receivedDateTime": "2026-05-20T10:00:00Z", + } + ], + } + + with patch.object(connector, "_get", return_value=delta_resp): + batches = list(connector.poll_source(0.0, 9999999999.0)) + + text = batches[0][0].blob.decode("utf-8") + assert "

" not in text + assert "Hello world" in text + + +# --------------------------------------------------------------------------- +# Non-2xx Graph responses must raise (no silent partial syncs) +# --------------------------------------------------------------------------- + +def _ok(json_value): + resp = MagicMock(ok=True, status_code=200) + resp.json.return_value = json_value + return resp + + +def _err(status, text=""): + resp = MagicMock(ok=False, status_code=status) + resp.text = text + return resp + + +@pytest.mark.p1 +def test_iter_documents_raises_on_http_500(): + """A 500 from the delta endpoint must surface; silently breaking would + advance the checkpoint past data we never saw.""" + connector = OutlookConnector(batch_size=10, user_ids=["alice@example.com"]) + connector._access_token = "tok" + with patch.object(connector, "_get", side_effect=[_err(500, "boom")]): + with pytest.raises(UnexpectedValidationError): + list(connector.poll_source(0.0, 9999999999.0)) + + +@pytest.mark.p1 +def test_iter_documents_raises_on_http_429(): + """Throttling must propagate so the orchestrator retries instead of + treating the run as a clean empty sync.""" + connector = OutlookConnector(batch_size=10, user_ids=["alice@example.com"]) + connector._access_token = "tok" + with patch.object(connector, "_get", side_effect=[_err(429, "throttled")]): + with pytest.raises(UnexpectedValidationError): + list(connector.poll_source(0.0, 9999999999.0)) + + +@pytest.mark.p1 +def test_list_user_ids_raises_on_http_error(): + connector = OutlookConnector() # no user_ids -> hits Graph + connector._access_token = "tok" + with patch.object(connector, "_get", side_effect=[_err(503, "down")]): + with pytest.raises(UnexpectedValidationError): + connector._list_user_ids() + + +# --------------------------------------------------------------------------- +# retrieve_all_slim_docs_perm_sync: yields list[SlimDocument] for prune +# --------------------------------------------------------------------------- + +@pytest.mark.p1 +def test_retrieve_slim_docs_yields_slimdocument_batches(): + """The prune collector calls file_list.extend(batch) and reads `.id` on + every retained item, so retrieve_all_slim_docs_perm_sync must yield + lists of SlimDocument, not bare dicts.""" + connector = OutlookConnector(batch_size=2, user_ids=["alice@example.com"]) + connector._access_token = "tok" + + delta_resp = _ok({ + "value": [ + {"id": "m1", "subject": "a"}, + {"id": "m2", "subject": "b"}, + {"id": "m3", "subject": "c"}, + ], + }) + + with patch.object(connector, "_get", return_value=delta_resp): + batches = list(connector.retrieve_all_slim_docs_perm_sync()) + + assert len(batches) == 2 + assert [len(b) for b in batches] == [2, 1] + flat = [item for batch in batches for item in batch] + assert all(isinstance(item, SlimDocument) for item in flat) + assert {item.id for item in flat} == {"m1", "m2", "m3"} + + +@pytest.mark.p2 +def test_retrieve_slim_docs_skips_removed(): + connector = OutlookConnector(batch_size=10, user_ids=["alice@example.com"]) + connector._access_token = "tok" + + delta_resp = _ok({ + "value": [ + {"id": "del", "@removed": {"reason": "deleted"}}, + {"id": "keep", "subject": "kept"}, + ], + }) + with patch.object(connector, "_get", return_value=delta_resp): + batches = list(connector.retrieve_all_slim_docs_perm_sync()) + flat = [item for batch in batches for item in batch] + assert [item.id for item in flat] == ["keep"] + + +@pytest.mark.p2 +def test_retrieve_slim_docs_raises_on_http_error(): + connector = OutlookConnector(batch_size=10, user_ids=["alice@example.com"]) + connector._access_token = "tok" + with patch.object(connector, "_get", side_effect=[_err(502, "bad gateway")]): + with pytest.raises(UnexpectedValidationError): + list(connector.retrieve_all_slim_docs_perm_sync()) + + +@pytest.mark.p2 +def test_retrieve_slim_docs_requires_credentials(): + connector = OutlookConnector() + with pytest.raises(ConnectorMissingCredentialError): + list(connector.retrieve_all_slim_docs_perm_sync()) + + +# --------------------------------------------------------------------------- +# load_from_checkpoint: resumes from delta_links +# --------------------------------------------------------------------------- + +@pytest.mark.p1 +def test_load_from_checkpoint_uses_persisted_delta_link(): + """With a delta_link for a user the connector must hit that URL — not + the per-user mailbox root — so incremental runs resume properly.""" + connector = OutlookConnector(batch_size=10, user_ids=["alice@example.com"]) + connector._access_token = "tok" + + saved = "https://graph.microsoft.com/v1.0/users/alice@example.com/delta?$skiptoken=ABC" + ckpt = OutlookCheckpoint(has_more=True, delta_links={"alice@example.com": saved}) + + visited: list[str] = [] + + def _stub(url): + visited.append(url) + return _ok({"value": [], "@odata.deltaLink": "next-link"}) + + with patch.object(connector, "_get", side_effect=_stub): + list(connector.load_from_checkpoint(0.0, 0.0, ckpt)) + + # First (and only) call must be the saved delta link. + assert visited == [saved] + assert ckpt.delta_links == {"alice@example.com": "next-link"} + + +# --------------------------------------------------------------------------- +# _redact: keep debugging hint, drop PII +# --------------------------------------------------------------------------- + +@pytest.mark.p3 +def test_redact_email_masks_local_and_domain(): + assert _redact("alice@example.com") == "al***@***" + + +@pytest.mark.p3 +def test_redact_short_email_keeps_local(): + assert _redact("a@x.com") == "a@***" + + +@pytest.mark.p3 +def test_redact_object_id_keeps_prefix(): + assert _redact("12345678-1234-1234-1234-123456789012") == "1234***" + + +@pytest.mark.p3 +def test_redact_empty_value(): + assert _redact("") == "" + assert _redact(None) == "" diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index 4b38f1acbf..68878961d1 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -1395,6 +1395,18 @@ Example: Virtual Hosted Style`, 'Client secret value generated in the Azure AD app registration.', onedriveFolderPathTip: 'Optional sub-folder path to limit indexing (e.g. /Documents/Reports). Leave blank to index the entire drive.', + outlookDescription: + 'Connect Outlook / Microsoft 365 mailboxes and index messages via Microsoft Graph delta queries.', + outlookTenantIdTip: + 'Azure Active Directory tenant ID (Directory ID) of the Microsoft 365 organisation.', + outlookClientIdTip: + 'Application (client) ID of the Azure AD app registration with Mail.Read permission.', + outlookClientSecretTip: + 'Client secret value generated in the Azure AD app registration.', + outlookFolderTip: + 'Mail folder to sync (e.g. inbox, sentitems, archive). Defaults to inbox.', + outlookUserIdsTip: + 'Comma-separated UPNs or object IDs of mailboxes to sync. Leave blank to sync every mailbox in the tenant (requires User.Read.All).', restApiQueryParamsTip: 'Key=value pairs (one per line) sent as URL query parameters. Use this instead of embedding params in the URL.', restApiHeadersTip: diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts index 7a0ec91f8f..ae2daf6d13 100644 --- a/web/src/locales/zh.ts +++ b/web/src/locales/zh.ts @@ -1099,12 +1099,26 @@ NER:使用 spaCy NER 和基于规则的关键词提取来抽取实体和关系 gmailTokenTip: '请上传由 Google Console 生成的 OAuth JSON。如果仅包含 client credentials,请通过浏览器授权一次以获取长期有效的刷新 Token。', dropboxDescription: '连接 Dropbox,同步指定账号下的文件与文件夹。', - onedriveDescription: '连接 OneDrive 或 OneDrive for Business,通过 Microsoft Graph delta 查询索引文件和文件夹。', + onedriveDescription: + '连接 OneDrive 或 OneDrive for Business,通过 Microsoft Graph delta 查询索引文件和文件夹。', onedriveTenantIdTip: 'Microsoft 365 组织的 Azure Active Directory 租户 ID(目录 ID)。', onedriveClientIdTip: '拥有 Files.Read.All 权限的 Azure AD 应用注册的应用程序(客户端)ID。', onedriveClientSecretTip: '在 Azure AD 应用注册中生成的客户端密钥值。', - onedriveFolderPathTip: '可选的子文件夹路径,用于限制索引范围(例如 /Documents/Reports)。留空则索引整个云盘。', - teamsDescription: '通过 Microsoft Graph 连接 Microsoft Teams,同步频道帖子与回复。', + onedriveFolderPathTip: + '可选的子文件夹路径,用于限制索引范围(例如 /Documents/Reports)。留空则索引整个云盘。', + outlookDescription: + '连接 Outlook / Microsoft 365 邮箱,通过 Microsoft Graph delta 查询索引邮件。', + outlookTenantIdTip: + 'Microsoft 365 组织的 Azure Active Directory 租户 ID(目录 ID)。', + outlookClientIdTip: + '拥有 Mail.Read 权限的 Azure AD 应用注册的应用程序(客户端)ID。', + outlookClientSecretTip: '在 Azure AD 应用注册中生成的客户端密钥值。', + outlookFolderTip: + '要同步的邮件文件夹(例如 inbox、sentitems、archive),默认为 inbox。', + outlookUserIdsTip: + '要同步的邮箱 UPN 或对象 ID 列表(逗号分隔)。留空则同步租户内的所有邮箱(需要 User.Read.All 权限)。', + teamsDescription: + '通过 Microsoft Graph 连接 Microsoft Teams,同步频道帖子与回复。', teamsTenantIdTip: 'Azure AD 租户 ID。需要具备 Team.ReadBasic.All 与 ChannelMessage.Read.All 应用权限(管理员同意)的应用。', slackDescription: '连接你的 Slack 工作区,同步频道消息与讨论串。', diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 2938b6ee47..17d66b97b3 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -44,6 +44,7 @@ export enum DataSourceKey { REST_API = 'rest_api', RSS = 'rss', ONEDRIVE = 'onedrive', + OUTLOOK = 'outlook', TEAMS = 'teams', SLACK = 'slack', SHAREPOINT = 'sharepoint', @@ -133,6 +134,9 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.ONEDRIVE]: { syncDeletedFiles: true, }, + [DataSourceKey.OUTLOOK]: { + syncDeletedFiles: true, + }, [DataSourceKey.TEAMS]: { syncDeletedFiles: true, }, @@ -326,6 +330,11 @@ export const generateDataSourceInfo = (t: TFunction) => { description: t(`setting.${DataSourceKey.ONEDRIVE}Description`), icon: , }, + [DataSourceKey.OUTLOOK]: { + name: 'Outlook', + description: t(`setting.${DataSourceKey.OUTLOOK}Description`), + icon: , + }, }; }; @@ -455,6 +464,57 @@ export const DataSourceFormFields = { }, }, ], + [DataSourceKey.OUTLOOK]: [ + { + label: 'Tenant ID', + name: 'config.credentials.tenant_id', + type: FormFieldType.Text, + required: true, + placeholder: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx', + tooltip: t('setting.outlookTenantIdTip'), + }, + { + label: 'Client ID', + name: 'config.credentials.client_id', + type: FormFieldType.Text, + required: true, + placeholder: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx', + tooltip: t('setting.outlookClientIdTip'), + }, + { + label: 'Client Secret', + name: 'config.credentials.client_secret', + type: FormFieldType.Password, + required: true, + tooltip: t('setting.outlookClientSecretTip'), + }, + { + label: 'Mail Folder', + name: 'config.folder', + type: FormFieldType.Text, + required: false, + placeholder: 'inbox', + tooltip: t('setting.outlookFolderTip'), + }, + { + label: 'Mailbox User IDs (optional)', + name: 'config.user_ids', + type: FormFieldType.Text, + required: false, + placeholder: 'support@example.com, sales@example.com', + tooltip: t('setting.outlookUserIdsTip'), + }, + { + label: 'Batch Size', + name: 'config.batch_size', + type: FormFieldType.Number, + required: false, + validation: { + min: 1, + message: 'Batch Size must be at least 1', + }, + }, + ], [DataSourceKey.RSS]: [ { label: 'Feed URL', @@ -1908,6 +1968,20 @@ export const DataSourceFormDefaultValues = { }, }, }, + [DataSourceKey.OUTLOOK]: { + name: '', + source: DataSourceKey.OUTLOOK, + config: { + folder: 'inbox', + user_ids: '', + batch_size: 2, + credentials: { + tenant_id: '', + client_id: '', + client_secret: '', + }, + }, + }, [DataSourceKey.REST_API]: { name: '', source: DataSourceKey.REST_API,