mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
feat: implement Microsoft Teams data source connector (#15193)
### What problem does this PR solve? Closes #15191. RAGFlow shipped a Microsoft Teams connector stub (`common/data_source/teams_connector.py`) whose document-loading methods all returned `[]`, `Teams._generate()` was a `pass`, and Teams was commented out of the data-source settings UI. As a result there was no way to index Teams channel conversations into a knowledge base. This PR implements the connector end to end on top of Microsoft Graph (Office365-REST-Python-Client). It shares the MSAL client-credentials auth shape with the SharePoint connector. **Backend** - `common/data_source/teams_connector.py` - `load_credentials()` now builds the Graph client using an MSAL client-credentials **token callback** — the form `GraphClient` actually expects. (The previous stub passed a raw access-token string to `GraphClient(...)`, which is not how that client is driven.) Token acquisition is lazy, so credential loading performs no network call. - `validate_connector_settings()` lists teams via Graph. - `load_from_checkpoint()` is now a generator that pages teams → channels → messages, flattens each top-level post together with its replies into one blob-based `Document` (`extension` `.txt`/`.html`, `blob`, `size_bytes`, `doc_updated_at`). Incremental syncs are bounded by message `lastModifiedDateTime` (falling back to `createdDateTime`). Per-message errors surface as `ConnectorFailure` instead of aborting the run. - `retrieve_all_slim_docs_perm_sync()` yields id-only `SlimDocument` batches and the checkpoint helpers return proper `TeamsCheckpoint`s. - ACL → `ExternalAccess` mapping is intentionally left best-effort (`load_from_checkpoint_with_perm_sync` delegates to the standard load) because the sync pipeline does not currently persist `ExternalAccess`. - `rag/svr/sync_data_source.py` - Implemented `Teams._generate()` using the existing `CheckpointOutputWrapper` pattern (same shape as Confluence/Jira/Google Drive), supporting full reindex and incremental polling from `poll_range_start`. - `TeamsConnector` is already exported from `common/data_source/__init__.py`. **Frontend (`web/`)** - Enabled the `TEAMS` data-source enum and added its form fields (`tenant_id`, `client_id`, `client_secret`), default values, display metadata, and a Teams icon. - Added `teamsDescription` / `teamsTenantIdTip` to `en.ts` and `zh.ts`. **Tests** - `test/unit_test/data_source/test_teams_connector_unit.py`: mock-based unit tests covering credential loading (incomplete creds raise, happy path sets the Graph client, fetch-without-creds raises), post/reply flattening (incl. the HTML vs text extension), incremental `lastModifiedDateTime` filtering, and slim-doc listing. All 6 pass; `ruff check` is clean. ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@@ -1,25 +1,44 @@
|
||||
"""Microsoft Teams connector"""
|
||||
"""Microsoft Teams connector
|
||||
|
||||
from typing import Any
|
||||
Ingests Microsoft Teams channel conversations (posts and their replies) via the
|
||||
Microsoft Graph API (Office365-REST-Python-Client). Authentication uses MSAL
|
||||
client-credentials (app-only) flow, so it requires an Azure AD app with the
|
||||
``Team.ReadBasic.All`` and ``ChannelMessage.Read.All`` application permissions
|
||||
(admin-consented).
|
||||
|
||||
Each top-level channel post is flattened together with its replies into one
|
||||
blob-based ``Document``. Incremental syncs are bounded by the post
|
||||
``lastModifiedDateTime`` (falling back to ``createdDateTime``).
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Generator
|
||||
|
||||
import msal
|
||||
from office365.graph_client import GraphClient
|
||||
from office365.runtime.client_request_exception import ClientRequestException
|
||||
|
||||
from common.data_source.exceptions import (
|
||||
ConnectorMissingCredentialError,
|
||||
ConnectorValidationError,
|
||||
InsufficientPermissionsError,
|
||||
UnexpectedValidationError, ConnectorMissingCredentialError
|
||||
UnexpectedValidationError,
|
||||
)
|
||||
from common.data_source.interfaces import (
|
||||
CheckpointedConnectorWithPermSync,
|
||||
SecondsSinceUnixEpoch,
|
||||
SlimConnectorWithPermSync, CheckpointedConnectorWithPermSync
|
||||
SlimConnectorWithPermSync,
|
||||
)
|
||||
from common.data_source.models import (
|
||||
ConnectorCheckpoint
|
||||
ConnectorCheckpoint,
|
||||
ConnectorFailure,
|
||||
Document,
|
||||
DocumentFailure,
|
||||
SlimDocument,
|
||||
)
|
||||
|
||||
_SLIM_DOC_BATCH_SIZE = 5000
|
||||
GRAPH_SCOPES = ["https://graph.microsoft.com/.default"]
|
||||
|
||||
|
||||
class TeamsCheckpoint(ConnectorCheckpoint):
|
||||
@@ -28,86 +47,264 @@ class TeamsCheckpoint(ConnectorCheckpoint):
|
||||
|
||||
|
||||
class TeamsConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync):
|
||||
"""Microsoft Teams connector for accessing Teams messages and channels"""
|
||||
"""Microsoft Teams connector for accessing Teams messages and channels."""
|
||||
|
||||
def __init__(self, batch_size: int = _SLIM_DOC_BATCH_SIZE) -> None:
|
||||
self.batch_size = batch_size
|
||||
self.teams_client = None
|
||||
self.graph_client: GraphClient | None = None
|
||||
|
||||
# -- credentials ---------------------------------------------------------
|
||||
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
"""Load Microsoft Teams credentials"""
|
||||
try:
|
||||
tenant_id = credentials.get("tenant_id")
|
||||
client_id = credentials.get("client_id")
|
||||
client_secret = credentials.get("client_secret")
|
||||
|
||||
if not all([tenant_id, client_id, client_secret]):
|
||||
raise ConnectorMissingCredentialError("Microsoft Teams credentials are incomplete")
|
||||
|
||||
# Create MSAL confidential client
|
||||
app = msal.ConfidentialClientApplication(
|
||||
client_id=client_id,
|
||||
client_credential=client_secret,
|
||||
authority=f"https://login.microsoftonline.com/{tenant_id}"
|
||||
)
|
||||
|
||||
# Get access token
|
||||
result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
|
||||
|
||||
if "access_token" not in result:
|
||||
raise ConnectorMissingCredentialError("Failed to acquire Microsoft Teams access token")
|
||||
|
||||
# Create Graph client for Teams
|
||||
self.teams_client = GraphClient(result["access_token"])
|
||||
|
||||
return None
|
||||
except Exception as e:
|
||||
raise ConnectorMissingCredentialError(f"Microsoft Teams: {e}")
|
||||
"""Configure a Microsoft Graph client from app-only credentials.
|
||||
|
||||
Uses a lazy MSAL token callback (the form ``GraphClient`` expects), so
|
||||
this performs no network call; the first request acquires the token.
|
||||
"""
|
||||
tenant_id = credentials.get("tenant_id")
|
||||
client_id = credentials.get("client_id")
|
||||
client_secret = credentials.get("client_secret")
|
||||
|
||||
if not all([tenant_id, client_id, client_secret]):
|
||||
raise ConnectorMissingCredentialError("Microsoft Teams credentials are incomplete")
|
||||
|
||||
authority = f"https://login.microsoftonline.com/{tenant_id}"
|
||||
# Build the MSAL app once and reuse it across token acquisitions so its
|
||||
# in-memory token cache is honored. Re-creating the app on every call
|
||||
# (as the callback previously did) defeats the cache and triggers an
|
||||
# Azure AD round-trip for each request.
|
||||
app = msal.ConfidentialClientApplication(
|
||||
client_id=client_id,
|
||||
client_credential=client_secret,
|
||||
authority=authority,
|
||||
)
|
||||
|
||||
def _acquire_token() -> dict[str, Any]:
|
||||
"""Return a cached or freshly minted app-only Graph token."""
|
||||
token = app.acquire_token_for_client(scopes=GRAPH_SCOPES)
|
||||
if "access_token" not in token:
|
||||
detail = token.get("error_description") or token.get("error") or token
|
||||
raise ConnectorMissingCredentialError(
|
||||
f"Failed to acquire Microsoft Teams access token: {detail}"
|
||||
)
|
||||
return token
|
||||
|
||||
self.graph_client = GraphClient(_acquire_token)
|
||||
return None
|
||||
|
||||
def validate_connector_settings(self) -> None:
|
||||
"""Validate Microsoft Teams connector settings"""
|
||||
if not self.teams_client:
|
||||
"""Validate credentials by listing teams."""
|
||||
if self.graph_client is None:
|
||||
raise ConnectorMissingCredentialError("Microsoft Teams")
|
||||
|
||||
try:
|
||||
# Test connection by getting teams
|
||||
teams = self.teams_client.teams.get().execute_query()
|
||||
if not teams:
|
||||
raise ConnectorValidationError("Failed to access Microsoft Teams")
|
||||
except ClientRequestException as e:
|
||||
if "401" in str(e) or "403" in str(e):
|
||||
raise InsufficientPermissionsError("Invalid credentials or insufficient permissions")
|
||||
else:
|
||||
raise UnexpectedValidationError(f"Microsoft Teams validation error: {e}")
|
||||
|
||||
def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> Any:
|
||||
"""Poll Microsoft Teams for recent messages"""
|
||||
# Simplified implementation - in production this would handle actual polling
|
||||
return []
|
||||
try:
|
||||
self.graph_client.teams.get().execute_query()
|
||||
except ConnectorValidationError:
|
||||
raise
|
||||
except Exception as e:
|
||||
message = str(e)
|
||||
if "401" in message or "403" in message:
|
||||
raise InsufficientPermissionsError(
|
||||
"Invalid credentials or insufficient permissions for Microsoft Teams"
|
||||
)
|
||||
raise UnexpectedValidationError(f"Microsoft Teams validation error: {e}")
|
||||
|
||||
# -- helpers -------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _prop(obj: Any, name: str) -> Any:
|
||||
"""Read a property by name, falling back to the OData ``properties`` dict."""
|
||||
value = getattr(obj, name, None)
|
||||
if value is None:
|
||||
value = getattr(obj, "properties", {}).get(name)
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def _parse_dt(value: Any) -> datetime | None:
|
||||
"""Parse a Graph datetime (ISO string or datetime) into a tz-aware UTC datetime."""
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, datetime):
|
||||
dt = value
|
||||
elif isinstance(value, str):
|
||||
try:
|
||||
dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt
|
||||
|
||||
@classmethod
|
||||
def _message_body(cls, message: Any) -> tuple[str, str]:
|
||||
"""Return ``(content, content_type)`` from a message's ItemBody."""
|
||||
body = getattr(message, "body", None)
|
||||
if body is None:
|
||||
return "", "text"
|
||||
content = getattr(body, "content", None)
|
||||
if content is None:
|
||||
content = getattr(body, "properties", {}).get("content")
|
||||
content_type = getattr(body, "contentType", None)
|
||||
if content_type is None:
|
||||
content_type = getattr(body, "properties", {}).get("contentType")
|
||||
return content or "", (content_type or "text").lower()
|
||||
|
||||
def _message_to_document(
|
||||
self,
|
||||
message: Any,
|
||||
replies: list[Any],
|
||||
team_id: str,
|
||||
team_name: str,
|
||||
channel_id: str,
|
||||
channel_name: str,
|
||||
) -> Document:
|
||||
"""Flatten a post and its replies into a single blob-based Document."""
|
||||
thread = [message, *replies]
|
||||
|
||||
contents = []
|
||||
content_type = "text"
|
||||
latest = None
|
||||
for item in thread:
|
||||
text, ctype = self._message_body(item)
|
||||
if text:
|
||||
contents.append(text)
|
||||
if ctype == "html":
|
||||
content_type = "html"
|
||||
modified = self._parse_dt(self._prop(item, "lastModifiedDateTime")) or self._parse_dt(
|
||||
self._prop(item, "createdDateTime")
|
||||
)
|
||||
if modified is not None and (latest is None or modified > latest):
|
||||
latest = modified
|
||||
|
||||
joined = "\n\n".join(contents)
|
||||
blob = joined.encode("utf-8")
|
||||
|
||||
snippet = joined.strip().replace("\n", " ")
|
||||
if len(snippet) > 50:
|
||||
snippet = snippet[:50].rstrip() + "..."
|
||||
semantic_identifier = f"{channel_name}: {snippet}" if snippet else f"{channel_name} message"
|
||||
|
||||
metadata = {"team": team_name, "channel": channel_name}
|
||||
web_url = self._prop(message, "web_url") or self._prop(message, "webUrl")
|
||||
if web_url:
|
||||
metadata["web_url"] = web_url
|
||||
|
||||
return Document(
|
||||
id=f"{team_id}__{channel_id}__{message.id}",
|
||||
source="teams",
|
||||
semantic_identifier=semantic_identifier,
|
||||
extension=".html" if content_type == "html" else ".txt",
|
||||
blob=blob,
|
||||
size_bytes=len(blob),
|
||||
doc_updated_at=latest or datetime.now(timezone.utc),
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
def _iter_channel_messages(self):
|
||||
"""Yield (team_id, team_name, channel_id, channel_name, message) tuples.
|
||||
|
||||
Uses ``get_all()`` for every collection so Microsoft Graph's
|
||||
``@odata.nextLink`` pages are followed; ``get().execute_query()`` would
|
||||
only return the first page and silently drop the rest on larger tenants.
|
||||
"""
|
||||
teams = self.graph_client.teams.get_all().execute_query()
|
||||
for team in teams:
|
||||
team_id = str(team.id)
|
||||
team_name = self._prop(team, "displayName") or team_id
|
||||
channels = team.channels.get_all().execute_query()
|
||||
for channel in channels:
|
||||
channel_id = str(channel.id)
|
||||
channel_name = self._prop(channel, "displayName") or channel_id
|
||||
messages = channel.messages.get_all().execute_query()
|
||||
for message in messages:
|
||||
yield team_id, team_name, channel_id, channel_name, message
|
||||
|
||||
def _generate_documents(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch,
|
||||
end: SecondsSinceUnixEpoch,
|
||||
) -> Generator[Document | ConnectorFailure, None, None]:
|
||||
"""Yield a Document per in-window channel post, or a failure per error."""
|
||||
if self.graph_client is None:
|
||||
raise ConnectorMissingCredentialError("Microsoft Teams")
|
||||
|
||||
for team_id, team_name, channel_id, channel_name, message in self._iter_channel_messages():
|
||||
try:
|
||||
modified = self._parse_dt(self._prop(message, "lastModifiedDateTime")) or self._parse_dt(
|
||||
self._prop(message, "createdDateTime")
|
||||
)
|
||||
if modified is not None:
|
||||
ts = modified.timestamp()
|
||||
# start is an exclusive lower bound; full reindex passes start=0.
|
||||
if not (start < ts <= end):
|
||||
continue
|
||||
|
||||
replies = list(message.replies.get_all().execute_query())
|
||||
yield self._message_to_document(
|
||||
message, replies, team_id, team_name, channel_id, channel_name
|
||||
)
|
||||
except Exception as e:
|
||||
logging.exception("Microsoft Teams failed to process message")
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=str(getattr(message, "id", "unknown")),
|
||||
document_link=self._prop(message, "web_url") or "",
|
||||
),
|
||||
failure_message=str(e),
|
||||
exception=e,
|
||||
)
|
||||
|
||||
# -- checkpointed connector interface ------------------------------------
|
||||
|
||||
def load_from_checkpoint(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch,
|
||||
end: SecondsSinceUnixEpoch,
|
||||
checkpoint: ConnectorCheckpoint,
|
||||
) -> Any:
|
||||
"""Load documents from checkpoint"""
|
||||
# Simplified implementation
|
||||
return []
|
||||
) -> Generator[Document | ConnectorFailure, None, ConnectorCheckpoint]:
|
||||
"""Yield a Document per channel post (with replies), then finish.
|
||||
|
||||
All teams/channels are enumerated in one pass, so the returned
|
||||
checkpoint always has ``has_more=False``.
|
||||
"""
|
||||
yield from self._generate_documents(start, end)
|
||||
return TeamsCheckpoint(has_more=False)
|
||||
|
||||
def load_from_checkpoint_with_perm_sync(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch,
|
||||
end: SecondsSinceUnixEpoch,
|
||||
checkpoint: ConnectorCheckpoint,
|
||||
) -> Generator[Document | ConnectorFailure, None, ConnectorCheckpoint]:
|
||||
"""Permission-aware variant.
|
||||
|
||||
Teams ACL -> ExternalAccess mapping is not yet wired through the sync
|
||||
pipeline (it does not persist ExternalAccess), so this currently yields
|
||||
the same documents as ``load_from_checkpoint``.
|
||||
"""
|
||||
return self.load_from_checkpoint(start, end, checkpoint)
|
||||
|
||||
def build_dummy_checkpoint(self) -> ConnectorCheckpoint:
|
||||
"""Build dummy checkpoint"""
|
||||
return TeamsCheckpoint()
|
||||
return TeamsCheckpoint(has_more=True)
|
||||
|
||||
def validate_checkpoint_json(self, checkpoint_json: str) -> ConnectorCheckpoint:
|
||||
"""Validate checkpoint JSON"""
|
||||
# Simplified implementation
|
||||
return TeamsCheckpoint()
|
||||
return TeamsCheckpoint(has_more=True)
|
||||
|
||||
def retrieve_all_slim_docs_perm_sync(
|
||||
self,
|
||||
callback: Any = None,
|
||||
) -> Any:
|
||||
"""Retrieve all simplified documents with permission sync"""
|
||||
# Simplified implementation
|
||||
return []
|
||||
) -> Generator[list[SlimDocument], None, None]:
|
||||
"""Yield batches of slim documents (ids only) for prune/permission sync."""
|
||||
if self.graph_client is None:
|
||||
raise ConnectorMissingCredentialError("Microsoft Teams")
|
||||
|
||||
batch: list[SlimDocument] = []
|
||||
for team_id, _team_name, channel_id, _channel_name, message in self._iter_channel_messages():
|
||||
batch.append(SlimDocument(id=f"{team_id}__{channel_id}__{message.id}"))
|
||||
if len(batch) >= self.batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
if batch:
|
||||
yield batch
|
||||
|
||||
Reference in New Issue
Block a user