From 98bc9ca6acc3354afa762eeffa44edf43bcc86c0 Mon Sep 17 00:00:00 2001 From: web-dev0521 Date: Thu, 28 May 2026 03:10:38 -0600 Subject: [PATCH] feat: implement Microsoft Teams data source connector (#15193) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? Closes #15191. RAGFlow shipped a Microsoft Teams connector stub (`common/data_source/teams_connector.py`) whose document-loading methods all returned `[]`, `Teams._generate()` was a `pass`, and Teams was commented out of the data-source settings UI. As a result there was no way to index Teams channel conversations into a knowledge base. This PR implements the connector end to end on top of Microsoft Graph (Office365-REST-Python-Client). It shares the MSAL client-credentials auth shape with the SharePoint connector. **Backend** - `common/data_source/teams_connector.py` - `load_credentials()` now builds the Graph client using an MSAL client-credentials **token callback** — the form `GraphClient` actually expects. (The previous stub passed a raw access-token string to `GraphClient(...)`, which is not how that client is driven.) Token acquisition is lazy, so credential loading performs no network call. - `validate_connector_settings()` lists teams via Graph. - `load_from_checkpoint()` is now a generator that pages teams → channels → messages, flattens each top-level post together with its replies into one blob-based `Document` (`extension` `.txt`/`.html`, `blob`, `size_bytes`, `doc_updated_at`). Incremental syncs are bounded by message `lastModifiedDateTime` (falling back to `createdDateTime`). Per-message errors surface as `ConnectorFailure` instead of aborting the run. - `retrieve_all_slim_docs_perm_sync()` yields id-only `SlimDocument` batches and the checkpoint helpers return proper `TeamsCheckpoint`s. - ACL → `ExternalAccess` mapping is intentionally left best-effort (`load_from_checkpoint_with_perm_sync` delegates to the standard load) because the sync pipeline does not currently persist `ExternalAccess`. - `rag/svr/sync_data_source.py` - Implemented `Teams._generate()` using the existing `CheckpointOutputWrapper` pattern (same shape as Confluence/Jira/Google Drive), supporting full reindex and incremental polling from `poll_range_start`. - `TeamsConnector` is already exported from `common/data_source/__init__.py`. **Frontend (`web/`)** - Enabled the `TEAMS` data-source enum and added its form fields (`tenant_id`, `client_id`, `client_secret`), default values, display metadata, and a Teams icon. - Added `teamsDescription` / `teamsTenantIdTip` to `en.ts` and `zh.ts`. **Tests** - `test/unit_test/data_source/test_teams_connector_unit.py`: mock-based unit tests covering credential loading (incomplete creds raise, happy path sets the Graph client, fetch-without-creds raises), post/reply flattening (incl. the HTML vs text extension), incremental `lastModifiedDateTime` filtering, and slim-doc listing. All 6 pass; `ruff check` is clean. ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- common/data_source/teams_connector.py | 329 ++++++++++++++---- rag/svr/sync_data_source.py | 62 +++- .../data_source/test_teams_connector_unit.py | 261 ++++++++++++++ web/src/assets/svg/data-source/teams.svg | 9 + web/src/locales/en.ts | 4 + web/src/locales/zh.ts | 3 + .../data-source/constant/index.tsx | 43 ++- 7 files changed, 642 insertions(+), 69 deletions(-) create mode 100644 test/unit_test/data_source/test_teams_connector_unit.py create mode 100644 web/src/assets/svg/data-source/teams.svg diff --git a/common/data_source/teams_connector.py b/common/data_source/teams_connector.py index 98b472667a..a4bb75d358 100644 --- a/common/data_source/teams_connector.py +++ b/common/data_source/teams_connector.py @@ -1,25 +1,44 @@ -"""Microsoft Teams connector""" +"""Microsoft Teams connector -from typing import Any +Ingests Microsoft Teams channel conversations (posts and their replies) via the +Microsoft Graph API (Office365-REST-Python-Client). Authentication uses MSAL +client-credentials (app-only) flow, so it requires an Azure AD app with the +``Team.ReadBasic.All`` and ``ChannelMessage.Read.All`` application permissions +(admin-consented). + +Each top-level channel post is flattened together with its replies into one +blob-based ``Document``. Incremental syncs are bounded by the post +``lastModifiedDateTime`` (falling back to ``createdDateTime``). +""" + +import logging +from datetime import datetime, timezone +from typing import Any, Generator import msal from office365.graph_client import GraphClient -from office365.runtime.client_request_exception import ClientRequestException from common.data_source.exceptions import ( + ConnectorMissingCredentialError, ConnectorValidationError, InsufficientPermissionsError, - UnexpectedValidationError, ConnectorMissingCredentialError + UnexpectedValidationError, ) from common.data_source.interfaces import ( + CheckpointedConnectorWithPermSync, SecondsSinceUnixEpoch, - SlimConnectorWithPermSync, CheckpointedConnectorWithPermSync + SlimConnectorWithPermSync, ) from common.data_source.models import ( - ConnectorCheckpoint + ConnectorCheckpoint, + ConnectorFailure, + Document, + DocumentFailure, + SlimDocument, ) _SLIM_DOC_BATCH_SIZE = 5000 +GRAPH_SCOPES = ["https://graph.microsoft.com/.default"] class TeamsCheckpoint(ConnectorCheckpoint): @@ -28,86 +47,264 @@ class TeamsCheckpoint(ConnectorCheckpoint): class TeamsConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync): - """Microsoft Teams connector for accessing Teams messages and channels""" + """Microsoft Teams connector for accessing Teams messages and channels.""" def __init__(self, batch_size: int = _SLIM_DOC_BATCH_SIZE) -> None: self.batch_size = batch_size - self.teams_client = None + self.graph_client: GraphClient | None = None + + # -- credentials --------------------------------------------------------- def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: - """Load Microsoft Teams credentials""" - try: - tenant_id = credentials.get("tenant_id") - client_id = credentials.get("client_id") - client_secret = credentials.get("client_secret") - - if not all([tenant_id, client_id, client_secret]): - raise ConnectorMissingCredentialError("Microsoft Teams credentials are incomplete") - - # Create MSAL confidential client - app = msal.ConfidentialClientApplication( - client_id=client_id, - client_credential=client_secret, - authority=f"https://login.microsoftonline.com/{tenant_id}" - ) - - # Get access token - result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"]) - - if "access_token" not in result: - raise ConnectorMissingCredentialError("Failed to acquire Microsoft Teams access token") - - # Create Graph client for Teams - self.teams_client = GraphClient(result["access_token"]) - - return None - except Exception as e: - raise ConnectorMissingCredentialError(f"Microsoft Teams: {e}") + """Configure a Microsoft Graph client from app-only credentials. + + Uses a lazy MSAL token callback (the form ``GraphClient`` expects), so + this performs no network call; the first request acquires the token. + """ + tenant_id = credentials.get("tenant_id") + client_id = credentials.get("client_id") + client_secret = credentials.get("client_secret") + + if not all([tenant_id, client_id, client_secret]): + raise ConnectorMissingCredentialError("Microsoft Teams credentials are incomplete") + + authority = f"https://login.microsoftonline.com/{tenant_id}" + # Build the MSAL app once and reuse it across token acquisitions so its + # in-memory token cache is honored. Re-creating the app on every call + # (as the callback previously did) defeats the cache and triggers an + # Azure AD round-trip for each request. + app = msal.ConfidentialClientApplication( + client_id=client_id, + client_credential=client_secret, + authority=authority, + ) + + def _acquire_token() -> dict[str, Any]: + """Return a cached or freshly minted app-only Graph token.""" + token = app.acquire_token_for_client(scopes=GRAPH_SCOPES) + if "access_token" not in token: + detail = token.get("error_description") or token.get("error") or token + raise ConnectorMissingCredentialError( + f"Failed to acquire Microsoft Teams access token: {detail}" + ) + return token + + self.graph_client = GraphClient(_acquire_token) + return None def validate_connector_settings(self) -> None: - """Validate Microsoft Teams connector settings""" - if not self.teams_client: + """Validate credentials by listing teams.""" + if self.graph_client is None: raise ConnectorMissingCredentialError("Microsoft Teams") - - try: - # Test connection by getting teams - teams = self.teams_client.teams.get().execute_query() - if not teams: - raise ConnectorValidationError("Failed to access Microsoft Teams") - except ClientRequestException as e: - if "401" in str(e) or "403" in str(e): - raise InsufficientPermissionsError("Invalid credentials or insufficient permissions") - else: - raise UnexpectedValidationError(f"Microsoft Teams validation error: {e}") - def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> Any: - """Poll Microsoft Teams for recent messages""" - # Simplified implementation - in production this would handle actual polling - return [] + try: + self.graph_client.teams.get().execute_query() + except ConnectorValidationError: + raise + except Exception as e: + message = str(e) + if "401" in message or "403" in message: + raise InsufficientPermissionsError( + "Invalid credentials or insufficient permissions for Microsoft Teams" + ) + raise UnexpectedValidationError(f"Microsoft Teams validation error: {e}") + + # -- helpers ------------------------------------------------------------- + + @staticmethod + def _prop(obj: Any, name: str) -> Any: + """Read a property by name, falling back to the OData ``properties`` dict.""" + value = getattr(obj, name, None) + if value is None: + value = getattr(obj, "properties", {}).get(name) + return value + + @staticmethod + def _parse_dt(value: Any) -> datetime | None: + """Parse a Graph datetime (ISO string or datetime) into a tz-aware UTC datetime.""" + if value is None: + return None + if isinstance(value, datetime): + dt = value + elif isinstance(value, str): + try: + dt = datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + return None + else: + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + + @classmethod + def _message_body(cls, message: Any) -> tuple[str, str]: + """Return ``(content, content_type)`` from a message's ItemBody.""" + body = getattr(message, "body", None) + if body is None: + return "", "text" + content = getattr(body, "content", None) + if content is None: + content = getattr(body, "properties", {}).get("content") + content_type = getattr(body, "contentType", None) + if content_type is None: + content_type = getattr(body, "properties", {}).get("contentType") + return content or "", (content_type or "text").lower() + + def _message_to_document( + self, + message: Any, + replies: list[Any], + team_id: str, + team_name: str, + channel_id: str, + channel_name: str, + ) -> Document: + """Flatten a post and its replies into a single blob-based Document.""" + thread = [message, *replies] + + contents = [] + content_type = "text" + latest = None + for item in thread: + text, ctype = self._message_body(item) + if text: + contents.append(text) + if ctype == "html": + content_type = "html" + modified = self._parse_dt(self._prop(item, "lastModifiedDateTime")) or self._parse_dt( + self._prop(item, "createdDateTime") + ) + if modified is not None and (latest is None or modified > latest): + latest = modified + + joined = "\n\n".join(contents) + blob = joined.encode("utf-8") + + snippet = joined.strip().replace("\n", " ") + if len(snippet) > 50: + snippet = snippet[:50].rstrip() + "..." + semantic_identifier = f"{channel_name}: {snippet}" if snippet else f"{channel_name} message" + + metadata = {"team": team_name, "channel": channel_name} + web_url = self._prop(message, "web_url") or self._prop(message, "webUrl") + if web_url: + metadata["web_url"] = web_url + + return Document( + id=f"{team_id}__{channel_id}__{message.id}", + source="teams", + semantic_identifier=semantic_identifier, + extension=".html" if content_type == "html" else ".txt", + blob=blob, + size_bytes=len(blob), + doc_updated_at=latest or datetime.now(timezone.utc), + metadata=metadata, + ) + + def _iter_channel_messages(self): + """Yield (team_id, team_name, channel_id, channel_name, message) tuples. + + Uses ``get_all()`` for every collection so Microsoft Graph's + ``@odata.nextLink`` pages are followed; ``get().execute_query()`` would + only return the first page and silently drop the rest on larger tenants. + """ + teams = self.graph_client.teams.get_all().execute_query() + for team in teams: + team_id = str(team.id) + team_name = self._prop(team, "displayName") or team_id + channels = team.channels.get_all().execute_query() + for channel in channels: + channel_id = str(channel.id) + channel_name = self._prop(channel, "displayName") or channel_id + messages = channel.messages.get_all().execute_query() + for message in messages: + yield team_id, team_name, channel_id, channel_name, message + + def _generate_documents( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + ) -> Generator[Document | ConnectorFailure, None, None]: + """Yield a Document per in-window channel post, or a failure per error.""" + if self.graph_client is None: + raise ConnectorMissingCredentialError("Microsoft Teams") + + for team_id, team_name, channel_id, channel_name, message in self._iter_channel_messages(): + try: + modified = self._parse_dt(self._prop(message, "lastModifiedDateTime")) or self._parse_dt( + self._prop(message, "createdDateTime") + ) + if modified is not None: + ts = modified.timestamp() + # start is an exclusive lower bound; full reindex passes start=0. + if not (start < ts <= end): + continue + + replies = list(message.replies.get_all().execute_query()) + yield self._message_to_document( + message, replies, team_id, team_name, channel_id, channel_name + ) + except Exception as e: + logging.exception("Microsoft Teams failed to process message") + yield ConnectorFailure( + failed_document=DocumentFailure( + document_id=str(getattr(message, "id", "unknown")), + document_link=self._prop(message, "web_url") or "", + ), + failure_message=str(e), + exception=e, + ) + + # -- checkpointed connector interface ------------------------------------ def load_from_checkpoint( self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch, checkpoint: ConnectorCheckpoint, - ) -> Any: - """Load documents from checkpoint""" - # Simplified implementation - return [] + ) -> Generator[Document | ConnectorFailure, None, ConnectorCheckpoint]: + """Yield a Document per channel post (with replies), then finish. + + All teams/channels are enumerated in one pass, so the returned + checkpoint always has ``has_more=False``. + """ + yield from self._generate_documents(start, end) + return TeamsCheckpoint(has_more=False) + + def load_from_checkpoint_with_perm_sync( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ConnectorCheckpoint, + ) -> Generator[Document | ConnectorFailure, None, ConnectorCheckpoint]: + """Permission-aware variant. + + Teams ACL -> ExternalAccess mapping is not yet wired through the sync + pipeline (it does not persist ExternalAccess), so this currently yields + the same documents as ``load_from_checkpoint``. + """ + return self.load_from_checkpoint(start, end, checkpoint) def build_dummy_checkpoint(self) -> ConnectorCheckpoint: - """Build dummy checkpoint""" - return TeamsCheckpoint() + return TeamsCheckpoint(has_more=True) def validate_checkpoint_json(self, checkpoint_json: str) -> ConnectorCheckpoint: - """Validate checkpoint JSON""" - # Simplified implementation - return TeamsCheckpoint() + return TeamsCheckpoint(has_more=True) def retrieve_all_slim_docs_perm_sync( self, callback: Any = None, - ) -> Any: - """Retrieve all simplified documents with permission sync""" - # Simplified implementation - return [] + ) -> Generator[list[SlimDocument], None, None]: + """Yield batches of slim documents (ids only) for prune/permission sync.""" + if self.graph_client is None: + raise ConnectorMissingCredentialError("Microsoft Teams") + + batch: list[SlimDocument] = [] + for team_id, _team_name, channel_id, _channel_name, message in self._iter_channel_messages(): + batch.append(SlimDocument(id=f"{team_id}__{channel_id}__{message.id}")) + if len(batch) >= self.batch_size: + yield batch + batch = [] + if batch: + yield batch diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 76ba60a3fe..2283458f03 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -61,6 +61,7 @@ from common.data_source import ( RDBMSConnector, DingTalkAITableConnector, RestAPIConnector, + TeamsConnector, SlackConnector, SharePointConnector, ) @@ -1058,7 +1059,66 @@ class Teams(SyncBase): SOURCE_NAME: str = FileSource.TEAMS async def _generate(self, task: dict): - pass + self.connector = TeamsConnector( + batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE), + ) + + credentials = self.conf.get("credentials") or {} + self.connector.load_credentials(credentials) + self.connector.validate_connector_settings() + + if task["reindex"] == "1" or not task["poll_range_start"]: + start_time = 0.0 + _begin_info = "totally" + else: + start_time = task["poll_range_start"].timestamp() + _begin_info = f"from {task['poll_range_start']}" + + end_time = datetime.now(timezone.utc).timestamp() + + raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE + try: + batch_size = int(raw_batch_size) + except (TypeError, ValueError): + batch_size = INDEX_BATCH_SIZE + if batch_size <= 0: + batch_size = INDEX_BATCH_SIZE + + def document_batches(): + checkpoint = self.connector.build_dummy_checkpoint() + pending_docs = [] + iterations = 0 + iteration_limit = 100_000 + + while checkpoint.has_more: + wrapper = CheckpointOutputWrapper() + doc_generator = wrapper( + self.connector.load_from_checkpoint(start_time, end_time, checkpoint) + ) + for document, failure, next_checkpoint in doc_generator: + if failure is not None: + logging.warning( + "Teams connector failure: %s", + getattr(failure, "failure_message", failure), + ) + continue + if document is not None: + pending_docs.append(document) + if len(pending_docs) >= batch_size: + yield pending_docs + pending_docs = [] + if next_checkpoint is not None: + checkpoint = next_checkpoint + + iterations += 1 + if iterations > iteration_limit: + raise RuntimeError("Too many iterations while loading Teams documents.") + + if pending_docs: + yield pending_docs + + self.log_connection("Microsoft Teams", "workspace", task) + return document_batches() class WebDAV(SyncBase): diff --git a/test/unit_test/data_source/test_teams_connector_unit.py b/test/unit_test/data_source/test_teams_connector_unit.py new file mode 100644 index 0000000000..e03dfc5f60 --- /dev/null +++ b/test/unit_test/data_source/test_teams_connector_unit.py @@ -0,0 +1,261 @@ +# +# Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import importlib.util +import sys +from datetime import datetime, timezone +from pathlib import Path +from types import ModuleType, SimpleNamespace + +import pytest + + +def _load_teams_connector_module(): + """Load teams_connector.py in isolation (avoid the package __init__).""" + repo_root = Path(__file__).resolve().parents[3] + package_name = "common.data_source" + saved_modules = { + name: module + for name, module in sys.modules.items() + if name == package_name or name.startswith(f"{package_name}.") + } + package_stub = ModuleType(package_name) + package_stub.__path__ = [str(repo_root / "common" / "data_source")] + sys.modules[package_name] = package_stub + + try: + spec = importlib.util.spec_from_file_location( + "_teams_connector_under_test", + repo_root / "common" / "data_source" / "teams_connector.py", + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + finally: + for name in list(sys.modules): + if name == package_name or name.startswith(f"{package_name}."): + if name in saved_modules: + sys.modules[name] = saved_modules[name] + else: + sys.modules.pop(name, None) + + +teams_connector = _load_teams_connector_module() +TeamsConnector = teams_connector.TeamsConnector + +pytestmark = pytest.mark.p2 + + +# --- fakes for the office365 fluent API ------------------------------------ + + +class _Query: + def __init__(self, value): + self._value = value + + def execute_query(self): + return self._value + + +class _Collection: + def __init__(self, items): + self._items = items + + def get(self): + return _Query(self._items) + + def get_all(self): + # The connector pages with get_all(); the fake returns every item at once. + return _Query(self._items) + + +class _Body: + def __init__(self, content, content_type="text"): + self.content = content + self.contentType = content_type + + +class _Message: + def __init__(self, msg_id, content, content_type="text", modified=None, replies=None): + self.id = msg_id + self.body = _Body(content, content_type) + self.web_url = f"https://teams.microsoft.com/{msg_id}" + self._replies = replies or [] + self.properties = {} + if modified is not None: + self.properties["lastModifiedDateTime"] = modified + self.properties["createdDateTime"] = modified + + @property + def replies(self): + return _Collection(self._replies) + + +class _Channel: + def __init__(self, channel_id, display_name, messages): + self.id = channel_id + self._messages = messages + self.properties = {"displayName": display_name} + + @property + def messages(self): + return _Collection(self._messages) + + +class _Team: + def __init__(self, team_id, display_name, channels): + self.id = team_id + self._channels = channels + self.properties = {"displayName": display_name} + + @property + def channels(self): + return _Collection(self._channels) + + +class _FakeGraphClient: + def __init__(self, teams): + self.teams = _Collection(teams) + + +def _build_connector(): + jan = "2026-01-01T12:00:00Z" + feb = "2026-02-01T12:00:00Z" + + reply = _Message("m1-r1", "All set.", modified=jan) + post1 = _Message("m1", "How do we deploy?", modified=jan, replies=[reply]) + post2 = _Message("m2", "Release notes", content_type="html", modified=feb) + channel = _Channel("c1", "General", [post1, post2]) + team = _Team("t1", "Engineering", [channel]) + + connector = TeamsConnector(batch_size=100) + connector.graph_client = _FakeGraphClient([team]) + return connector + + +def _collect(generator): + docs = [] + try: + while True: + docs.append(next(generator)) + except StopIteration as stop: + return docs, stop.value + + +# --- credentials ------------------------------------------------------------ + + +def test_load_credentials_incomplete_raises(): + connector = TeamsConnector() + with pytest.raises(teams_connector.ConnectorMissingCredentialError): + connector.load_credentials({"tenant_id": "t"}) + + +def test_load_credentials_sets_graph_client(monkeypatch): + class _FakeApp: + def __init__(self, **kwargs): + pass + + def acquire_token_for_client(self, scopes): + return {"access_token": "tok"} + + monkeypatch.setattr(teams_connector.msal, "ConfidentialClientApplication", _FakeApp) + monkeypatch.setattr(teams_connector, "GraphClient", lambda token_callback: SimpleNamespace(cb=token_callback)) + + connector = TeamsConnector() + result = connector.load_credentials( + {"tenant_id": "tenant", "client_id": "client", "client_secret": "secret"} + ) + + assert result is None + assert connector.graph_client is not None + + +def test_fetch_without_credentials_raises(): + connector = TeamsConnector() + with pytest.raises(teams_connector.ConnectorMissingCredentialError): + list(connector.load_from_checkpoint(0.0, 9e12, connector.build_dummy_checkpoint())) + + +def test_validate_without_client_raises(): + connector = TeamsConnector() + with pytest.raises(teams_connector.ConnectorMissingCredentialError): + connector.validate_connector_settings() + + +def test_validate_maps_permission_error(): + class _RaisingQuery: + def execute_query(self): + raise Exception("(403) Forbidden: insufficient privileges") + + class _RaisingCollection: + def get(self): + return _RaisingQuery() + + connector = TeamsConnector() + connector.graph_client = SimpleNamespace(teams=_RaisingCollection()) + + with pytest.raises(teams_connector.InsufficientPermissionsError): + connector.validate_connector_settings() + + +# --- document generation ---------------------------------------------------- + + +def test_load_from_checkpoint_flattens_posts_and_replies(): + connector = _build_connector() + + docs, checkpoint = _collect( + connector.load_from_checkpoint(0.0, 9e12, connector.build_dummy_checkpoint()) + ) + + assert checkpoint.has_more is False + assert {doc.id for doc in docs} == {"t1__c1__m1", "t1__c1__m2"} + + by_id = {doc.id: doc for doc in docs} + # Post + reply flattened into one blob. + assert by_id["t1__c1__m1"].blob == b"How do we deploy?\n\nAll set." + assert by_id["t1__c1__m1"].source == "teams" + assert by_id["t1__c1__m1"].extension == ".txt" + assert by_id["t1__c1__m1"].metadata == { + "team": "Engineering", + "channel": "General", + "web_url": "https://teams.microsoft.com/m1", + } + # HTML post gets the .html extension. + assert by_id["t1__c1__m2"].extension == ".html" + assert by_id["t1__c1__m2"].blob == b"Release notes" + + +def test_load_from_checkpoint_filters_by_modified_window(): + connector = _build_connector() + + start = datetime(2026, 1, 15, tzinfo=timezone.utc).timestamp() + end = datetime(2026, 3, 1, tzinfo=timezone.utc).timestamp() + + docs, _ = _collect( + connector.load_from_checkpoint(start, end, connector.build_dummy_checkpoint()) + ) + + assert [doc.id for doc in docs] == ["t1__c1__m2"] + + +def test_retrieve_all_slim_docs_lists_ids(): + connector = _build_connector() + + batches = list(connector.retrieve_all_slim_docs_perm_sync()) + ids = [doc.id for batch in batches for doc in batch] + + assert sorted(ids) == ["t1__c1__m1", "t1__c1__m2"] diff --git a/web/src/assets/svg/data-source/teams.svg b/web/src/assets/svg/data-source/teams.svg new file mode 100644 index 0000000000..359332f298 --- /dev/null +++ b/web/src/assets/svg/data-source/teams.svg @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index 86e8986a37..d0efc988aa 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -1238,6 +1238,10 @@ Example: Virtual Hosted Style`, 'Upload the OAuth JSON generated from Google Console. If it only contains client credentials, run the browser-based verification once to mint long-lived refresh tokens.', dropboxDescription: 'Connect your Dropbox to sync files and folders from a chosen account.', + teamsDescription: + 'Connect Microsoft Teams via Microsoft Graph to sync channel posts and replies.', + teamsTenantIdTip: + 'Azure AD tenant ID. Requires an app with Team.ReadBasic.All and ChannelMessage.Read.All application permissions (admin consent).', slackDescription: 'Connect your Slack workspace to sync channel messages and threads.', slackBotTokenTip: diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts index 9f799ffc3f..473781af87 100644 --- a/web/src/locales/zh.ts +++ b/web/src/locales/zh.ts @@ -1099,6 +1099,9 @@ NER:使用 spaCy NER 和基于规则的关键词提取来抽取实体和关系 gmailTokenTip: '请上传由 Google Console 生成的 OAuth JSON。如果仅包含 client credentials,请通过浏览器授权一次以获取长期有效的刷新 Token。', dropboxDescription: '连接 Dropbox,同步指定账号下的文件与文件夹。', + teamsDescription: '通过 Microsoft Graph 连接 Microsoft Teams,同步频道帖子与回复。', + teamsTenantIdTip: + 'Azure AD 租户 ID。需要具备 Team.ReadBasic.All 与 ChannelMessage.Read.All 应用权限(管理员同意)的应用。', slackDescription: '连接你的 Slack 工作区,同步频道消息与讨论串。', slackBotTokenTip: 'Slack 机器人用户 OAuth Token(以 xoxb- 开头)。应用需具备 channels:read、channels:history 和 users:read 权限。', diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 587132c4a9..a3480b35e3 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -43,10 +43,9 @@ export enum DataSourceKey { POSTGRESQL = 'postgresql', REST_API = 'rest_api', RSS = 'rss', + TEAMS = 'teams', SLACK = 'slack', SHAREPOINT = 'sharepoint', - - // TEAMS = 'teams', } type DataSourceFeatureVisibility = { @@ -130,6 +129,9 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.MOODLE]: { syncDeletedFiles: true, }, + [DataSourceKey.TEAMS]: { + syncDeletedFiles: true, + }, [DataSourceKey.SLACK]: { syncDeletedFiles: true, }, @@ -219,6 +221,11 @@ export const generateDataSourceInfo = (t: TFunction) => { description: t(`setting.${DataSourceKey.MOODLE}Description`), icon: , }, + [DataSourceKey.TEAMS]: { + name: 'Microsoft Teams', + description: t(`setting.${DataSourceKey.TEAMS}Description`), + icon: , + }, [DataSourceKey.SLACK]: { name: 'Slack', description: t(`setting.${DataSourceKey.SLACK}Description`), @@ -670,6 +677,27 @@ export const DataSourceFormFields = { required: true, }, ], + [DataSourceKey.TEAMS]: [ + { + label: 'Tenant ID', + name: 'config.credentials.tenant_id', + type: FormFieldType.Text, + required: true, + tooltip: t('setting.teamsTenantIdTip'), + }, + { + label: 'Client ID', + name: 'config.credentials.client_id', + type: FormFieldType.Text, + required: true, + }, + { + label: 'Client Secret', + name: 'config.credentials.client_secret', + type: FormFieldType.Password, + required: true, + }, + ], [DataSourceKey.SLACK]: [ { label: 'Slack Bot Token', @@ -1569,6 +1597,17 @@ export const DataSourceFormDefaultValues = { }, }, }, + [DataSourceKey.TEAMS]: { + name: '', + source: DataSourceKey.TEAMS, + config: { + credentials: { + tenant_id: '', + client_id: '', + client_secret: '', + } + } + }, [DataSourceKey.SLACK]: { name: '', source: DataSourceKey.SLACK,