From 5de021ebb4e91f2aa83411d8464efec2096fe052 Mon Sep 17 00:00:00 2001 From: web-dev0521 Date: Thu, 28 May 2026 01:46:07 -0600 Subject: [PATCH] feat: implement Slack data source connector (#15188) ### What problem does this PR solve? Closes #15187. RAGFlow shipped a Slack connector (`common/data_source/slack_connector.py`) but it was never usable: `Slack._generate()` in the sync worker was a `pass` stub, the connector's document-generating code was incompatible with the current data model, and Slack was commented out of the data-source settings UI. As a result, teams had no way to index Slack channels/threads into a knowledge base. This PR completes the connector end to end. **Backend** - `common/data_source/slack_connector.py` - Rewrote `thread_to_doc` to produce a blob-based `Document` (`extension`/`blob`/`size_bytes`). The previous implementation built the doc with a `sections=[...]` argument and omitted the now-required `blob`/`extension`/ `size_bytes` fields, so it raised a validation error against the current `Document` model. Thread messages are now cleaned and flattened into a single UTF-8 text blob. - Added `load_from_state()` / `poll_source(start, end)` generators. The connector's checkpoint interface is a no-op stub, so both full and incremental syncs run through a single channel-iterating generator built on the existing module helpers (`get_channels`, `filter_channels`, `get_channel_messages`, `_process_message`), with per-channel thread de-duplication. - `rag/svr/sync_data_source.py` - Implemented `Slack._generate()`. Credentials are loaded via `StaticCredentialsProvider` (the connector requires `slack_bot_token` and does not support `load_credentials`). Supports full reindex and incremental polling from `poll_range_start`, plus the optional channel filter. Modeled on the Confluence/Dropbox wrappers. - `SlackConnector` was already exported from `common/data_source/__init__.py`. **Frontend (`web/`)** - Enabled the `SLACK` data-source enum and added its form fields (Slack bot token + optional channel filter), default values, display metadata, and a Slack icon. - Added `slackDescription` / `slackBotTokenTip` / `slackChannelsTip` strings to `en.ts` and `zh.ts`. **Tests** - `test/unit_test/data_source/test_slack_connector_unit.py`: unit tests covering credential loading (`load_credentials` raises, `set_credentials_provider` initializes clients, missing credentials raises) and document generation (standalone message + flattened thread, blob/extension/size_bytes/metadata, and the incremental poll time window). All 5 pass; `ruff check` is clean. Required Slack scopes: `channels:read`, `channels:history`, `users:read`. ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- common/data_source/slack_connector.py | 106 ++++++++- rag/svr/sync_data_source.py | 54 ++++- test/testcases/restful_api/test_documents.py | 25 ++- .../data_source/test_slack_connector_unit.py | 206 ++++++++++++++++++ web/src/assets/svg/data-source/slack.svg | 6 + web/src/locales/en.ts | 6 + web/src/locales/zh.ts | 5 + .../data-source/constant/index.tsx | 39 +++- 8 files changed, 428 insertions(+), 19 deletions(-) create mode 100644 test/unit_test/data_source/test_slack_connector_unit.py create mode 100644 web/src/assets/svg/data-source/slack.svg diff --git a/common/data_source/slack_connector.py b/common/data_source/slack_connector.py index 162826762c..441d4b6e9f 100644 --- a/common/data_source/slack_connector.py +++ b/common/data_source/slack_connector.py @@ -37,7 +37,6 @@ from common.data_source.models import ( Document, DocumentFailure, SlimDocument, - TextSection, SecondsSinceUnixEpoch, GenerateSlimDocumentOutput, MessageType, SlackMessageFilterReason, ChannelType, ThreadType, ProcessedSlackMessage, CheckpointOutput @@ -201,7 +200,10 @@ def thread_to_doc( ] valid_experts = [expert for expert in experts if expert] - first_message = slack_cleaner.index_clean(cast(str, thread[0]["text"])) + cleaned_messages = [ + slack_cleaner.index_clean(cast(str, m["text"])) for m in thread + ] + first_message = cleaned_messages[0] if cleaned_messages else "" snippet = ( first_message[:50].rstrip() + "..." if len(first_message) > 50 @@ -212,21 +214,22 @@ def thread_to_doc( "\n", " " ) + # The Document model is blob-based (no sections), so flatten the thread's + # cleaned messages into a single UTF-8 text blob. + content = "\n\n".join(cleaned_messages) + blob = content.encode("utf-8") + return Document( id=_build_doc_id(channel_id=channel_id, thread_ts=thread[0]["ts"]), - sections=[ - TextSection( - link=get_message_link(event=m, client=client, channel_id=channel_id), - text=slack_cleaner.index_clean(cast(str, m["text"])), - ) - for m in thread - ], source="slack", semantic_identifier=doc_sem_id, + extension=".txt", + blob=blob, + size_bytes=len(blob), doc_updated_at=get_latest_message_time(thread), primary_owners=valid_experts, metadata={"Channel": channel["name"]}, - external_access=channel_access, + externale_access=channel_access, ) @@ -540,6 +543,79 @@ class SlackConnector( callback=callback, ) + def _fetch_document_batches( + self, + oldest: str | None = None, + latest: str | None = None, + callback: Any = None, + ) -> Generator[list[Document], None, None]: + """Iterate the configured channels and yield batches of thread documents. + + The checkpoint interface is not implemented in this connector, so both + full and incremental syncs run through this generator. ``oldest`` / + ``latest`` are Slack epoch-second strings used to bound the + conversations history for incremental polling. + """ + if self.client is None or self.text_cleaner is None: + raise ConnectorMissingCredentialError("Slack") + + all_channels = get_channels(self.client) + filtered_channels = filter_channels( + all_channels, self.channels, self.channel_regex_enabled + ) + + batch: list[Document] = [] + for channel in filtered_channels: + seen_thread_ts: set[str] = set() + for message_batch in get_channel_messages( + client=self.client, + channel=channel, + oldest=oldest, + latest=latest, + callback=callback, + ): + for message in message_batch: + processed = _process_message( + message=message, + client=self.client, + channel=channel, + slack_cleaner=self.text_cleaner, + user_cache=self.user_cache, + seen_thread_ts=seen_thread_ts, + channel_access=None, + ) + + if processed.thread_or_message_ts: + seen_thread_ts.add(processed.thread_or_message_ts) + + if processed.failure is not None: + logging.warning( + "Slack message processing failure: %s", + processed.failure.failure_message, + ) + continue + + if processed.doc is not None: + batch.append(processed.doc) + if len(batch) >= self.batch_size: + yield batch + batch = [] + + if batch: + yield batch + + def load_from_state(self) -> Generator[list[Document], None, None]: + """Full sync: ingest every accessible channel message/thread.""" + return self._fetch_document_batches() + + def poll_source( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + ) -> Generator[list[Document], None, None]: + """Incremental sync bounded by a [start, end] epoch-seconds window.""" + return self._fetch_document_batches(oldest=str(start), latest=str(end)) + def load_from_checkpoint( self, start: SecondsSinceUnixEpoch, @@ -602,6 +678,16 @@ class SlackConnector( f"Slack API returned a failure: {error_msg}" ) + # 3) Confirm users:read scope is available (required by thread_to_doc) + users_resp = self.fast_client.users_info(user="USLACKBOT") + if not users_resp.get("ok", False): + error_msg = users_resp.get("error", "") + if error_msg in ("missing_scope", "not_allowed_token_type"): + raise InsufficientPermissionsError( + "Slack bot token lacks the 'users:read' scope required to look up message senders. " + "Please add 'users:read' to your Slack app's OAuth scopes." + ) + except SlackApiError as e: slack_error = e.response.get("error", "") if slack_error == "ratelimited": diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 3b01014327..76ba60a3fe 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -61,6 +61,7 @@ from common.data_source import ( RDBMSConnector, DingTalkAITableConnector, RestAPIConnector, + SlackConnector, SharePointConnector, ) from common.data_source.models import ConnectorFailure, SeafileSyncScope @@ -999,7 +1000,58 @@ class Slack(SyncBase): SOURCE_NAME: str = FileSource.SLACK async def _generate(self, task: dict): - pass + from common.data_source.config import DocumentSource + from common.data_source.interfaces import StaticCredentialsProvider + + channels_conf = self.conf.get("channels") + if isinstance(channels_conf, str): + channels = [c.strip() for c in channels_conf.split(",") if c.strip()] + elif isinstance(channels_conf, list): + channels = [str(c).strip() for c in channels_conf if str(c).strip()] + else: + channels = None + + raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE) + try: + batch_size = int(raw_batch_size) + except (TypeError, ValueError): + batch_size = INDEX_BATCH_SIZE + if batch_size <= 0: + batch_size = INDEX_BATCH_SIZE + + self.connector = SlackConnector( + channels=channels or None, + channel_regex_enabled=bool(self.conf.get("channel_regex_enabled", False)), + batch_size=batch_size, + ) + + credentials = self.conf.get("credentials") or {} + if not credentials.get("slack_bot_token"): + raise ValueError("Slack connector is missing the bot token credential.") + + credentials_provider = StaticCredentialsProvider( + tenant_id=task["tenant_id"], + connector_name=DocumentSource.SLACK, + credential_json=credentials, + ) + self.connector.set_credentials_provider(credentials_provider) + self.connector.validate_connector_settings() + + poll_start = task["poll_range_start"] + if task["reindex"] == "1" or not poll_start: + document_generator = self.connector.load_from_state() + _begin_info = "totally" + else: + end_time = datetime.now(timezone.utc).timestamp() + document_generator = self.connector.poll_source(poll_start.timestamp(), end_time) + _begin_info = f"from {poll_start}" + + self.log_connection( + "Slack", + f"channels({', '.join(channels) if channels else 'all'})", + task, + ) + return document_generator class Teams(SyncBase): diff --git a/test/testcases/restful_api/test_documents.py b/test/testcases/restful_api/test_documents.py index 05a1743d0c..e0084c13b5 100644 --- a/test/testcases/restful_api/test_documents.py +++ b/test/testcases/restful_api/test_documents.py @@ -64,16 +64,19 @@ def test_documents_upload_and_list(rest_client, create_dataset, tmp_path): assert any(doc["name"] == fp.name for doc in list_payload["data"]["docs"]), list_payload -def _upload_files(rest_client, dataset_id, file_paths): +def _upload_files(rest_client, dataset_id, file_paths, timeout=None): with ExitStack() as stack: files = [("file", (fp.name, stack.enter_context(fp.open("rb")))) for fp in file_paths] - return rest_client.post(f"/datasets/{dataset_id}/documents", files=files) + kwargs = {"files": files} + if timeout is not None: + kwargs["timeout"] = timeout + return rest_client.post(f"/datasets/{dataset_id}/documents", **kwargs) -def _seed_documents(rest_client, create_dataset, tmp_path, count=5): +def _seed_documents(rest_client, create_dataset, tmp_path, count=5, timeout=None): dataset_id = create_dataset("dataset_list_contract") file_paths = [create_txt_file(tmp_path / f"ragflow_test_upload_{i}.txt") for i in range(count)] - res = _upload_files(rest_client, dataset_id, file_paths) + res = _upload_files(rest_client, dataset_id, file_paths, timeout=timeout) assert res.status_code == 200 payload = res.json() assert payload["code"] == 0, payload @@ -1166,7 +1169,9 @@ def test_documents_delete_invalid_dataset_partial_duplicate_repeat_and_cross_dat @pytest.mark.p2 def test_documents_delete_concurrent_and_bulk_contract(rest_client, create_dataset, tmp_path): - dataset_id, uploaded_docs = _seed_documents(rest_client, create_dataset, tmp_path, count=60) + dataset_id, uploaded_docs = _seed_documents( + rest_client, create_dataset, tmp_path, count=60, timeout=120 + ) document_ids = [doc["id"] for doc in uploaded_docs] with ThreadPoolExecutor(max_workers=8) as executor: @@ -1192,9 +1197,15 @@ def test_documents_delete_concurrent_and_bulk_contract(rest_client, create_datas assert list_after_payload["code"] == 0, list_after_payload assert list_after_payload["data"]["total"] == 0, list_after_payload - bulk_dataset_id, bulk_docs = _seed_documents(rest_client, create_dataset, tmp_path, count=120) + bulk_dataset_id, bulk_docs = _seed_documents( + rest_client, create_dataset, tmp_path, count=120, timeout=120 + ) bulk_ids = [doc["id"] for doc in bulk_docs] - bulk_delete_res = rest_client.delete(f"/datasets/{bulk_dataset_id}/documents", json={"ids": bulk_ids}) + bulk_delete_res = rest_client.delete( + f"/datasets/{bulk_dataset_id}/documents", + json={"ids": bulk_ids}, + timeout=120, + ) assert bulk_delete_res.status_code == 200 bulk_delete_payload = bulk_delete_res.json() assert bulk_delete_payload["code"] == 0, bulk_delete_payload diff --git a/test/unit_test/data_source/test_slack_connector_unit.py b/test/unit_test/data_source/test_slack_connector_unit.py new file mode 100644 index 0000000000..56a9551e65 --- /dev/null +++ b/test/unit_test/data_source/test_slack_connector_unit.py @@ -0,0 +1,206 @@ +# +# Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import importlib.util +import sys +from pathlib import Path +from types import ModuleType + +import pytest + + +def _load_slack_connector_module(): + """Load slack_connector.py in isolation. + + Importing ``common.data_source`` directly would execute its ``__init__`` + and pull in every connector's (heavy) dependencies. We stub the package and + exec only the Slack module, mirroring the Dropbox connector unit test. + """ + repo_root = Path(__file__).resolve().parents[3] + package_name = "common.data_source" + saved_modules = { + name: module + for name, module in sys.modules.items() + if name == package_name or name.startswith(f"{package_name}.") + } + package_stub = ModuleType(package_name) + package_stub.__path__ = [str(repo_root / "common" / "data_source")] + sys.modules[package_name] = package_stub + + try: + spec = importlib.util.spec_from_file_location( + "_slack_connector_under_test", + repo_root / "common" / "data_source" / "slack_connector.py", + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + finally: + for name in list(sys.modules): + if name == package_name or name.startswith(f"{package_name}."): + if name in saved_modules: + sys.modules[name] = saved_modules[name] + else: + sys.modules.pop(name, None) + + +slack_connector = _load_slack_connector_module() +SlackConnector = slack_connector.SlackConnector +SlackTextCleaner = slack_connector.SlackTextCleaner + + +class _FakeResponse(dict): + """Mimics slack_sdk SlackResponse: dict-like, plus .validate() and .data.""" + + def validate(self): + return self + + @property + def data(self): + return self + + +class _FakeSlackClient: + def __init__(self): + self.token = "xoxb-test" + self.joined = [] + self._users = { + "U1": { + "real_name": "Alice", + "profile": {"display_name": "alice", "first_name": "Alice", "last_name": "A"}, + }, + "U2": {"real_name": "Bob", "profile": {"display_name": "bob"}}, + "U3": {"real_name": "Carol", "profile": {"display_name": "carol"}}, + } + + def conversations_list(self, cursor=None, limit=None, **kwargs): + return _FakeResponse( + { + "channels": [ + {"id": "C1", "name": "general", "is_member": True, "is_private": False}, + ], + "response_metadata": {"next_cursor": ""}, + } + ) + + def conversations_history(self, cursor=None, limit=None, channel=None, oldest=None, latest=None, **kwargs): + return _FakeResponse( + { + "messages": [ + {"ts": "1.0", "user": "U1", "text": "Hello world"}, + {"ts": "2.0", "user": "U2", "text": "Question?", "thread_ts": "2.0"}, + ], + "response_metadata": {"next_cursor": ""}, + } + ) + + def conversations_replies(self, cursor=None, limit=None, channel=None, ts=None, **kwargs): + return _FakeResponse( + { + "messages": [ + {"ts": "2.0", "user": "U2", "text": "Question?", "thread_ts": "2.0"}, + {"ts": "3.0", "user": "U3", "text": "Answer!", "thread_ts": "2.0"}, + ], + "response_metadata": {"next_cursor": ""}, + } + ) + + def users_info(self, user=None, **kwargs): + return _FakeResponse({"ok": True, "user": self._users.get(user, {})}) + + +def _connector_with_fake_client(client, batch_size=10): + connector = SlackConnector(batch_size=batch_size) + connector.client = client + connector.text_cleaner = SlackTextCleaner(client=client) + return connector + + +# --- credential loading ----------------------------------------------------- + + +@pytest.mark.p2 +def test_load_credentials_is_not_supported(): + connector = SlackConnector() + with pytest.raises(NotImplementedError): + connector.load_credentials({"slack_bot_token": "xoxb-abc"}) + + +@pytest.mark.p2 +def test_set_credentials_provider_initializes_clients(): + connector = SlackConnector() + + class _Provider: + def get_credentials(self): + return {"slack_bot_token": "xoxb-abc"} + + connector.set_credentials_provider(_Provider()) + + assert connector.client is not None + assert connector.fast_client is not None + assert connector.text_cleaner is not None + + +@pytest.mark.p2 +def test_fetch_without_credentials_raises(): + connector = SlackConnector() + with pytest.raises(slack_connector.ConnectorMissingCredentialError): + list(connector.load_from_state()) + + +# --- document generation ---------------------------------------------------- + + +@pytest.mark.p1 +def test_load_from_state_generates_thread_documents(): + connector = _connector_with_fake_client(_FakeSlackClient()) + + batches = list(connector.load_from_state()) + docs = [doc for batch in batches for doc in batch] + + # Standalone message + one thread (parent + reply collapsed into one doc). + assert [doc.id for doc in docs] == ["C1__1.0", "C1__2.0"] + + standalone, thread_doc = docs + assert standalone.source == "slack" + assert standalone.extension == ".txt" + assert standalone.blob == b"Hello world" + assert standalone.size_bytes == len(b"Hello world") + assert standalone.metadata == {"Channel": "general"} + # get_semantic_name() prefers real_name ("Alice") over the display_name. + assert standalone.semantic_identifier == "Alice in #general: Hello world" + + # Thread messages are flattened into a single blob, joined by blank lines. + assert thread_doc.blob == "Question?\n\nAnswer!".encode("utf-8") + assert thread_doc.size_bytes == len(thread_doc.blob) + assert thread_doc.semantic_identifier == "Bob in #general: Question?" + + +@pytest.mark.p1 +def test_poll_source_passes_time_window(): + client = _FakeSlackClient() + captured = {} + + def _history(cursor=None, limit=None, channel=None, oldest=None, latest=None, **kwargs): + captured["oldest"] = oldest + captured["latest"] = latest + return _FakeResponse({"messages": [], "response_metadata": {"next_cursor": ""}}) + + client.conversations_history = _history + connector = _connector_with_fake_client(client) + + list(connector.poll_source(100.0, 200.0)) + + assert captured == {"oldest": "100.0", "latest": "200.0"} diff --git a/web/src/assets/svg/data-source/slack.svg b/web/src/assets/svg/data-source/slack.svg new file mode 100644 index 0000000000..72d1110062 --- /dev/null +++ b/web/src/assets/svg/data-source/slack.svg @@ -0,0 +1,6 @@ + + + + + + diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index 1f8f1fe117..86e8986a37 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -1238,6 +1238,12 @@ Example: Virtual Hosted Style`, 'Upload the OAuth JSON generated from Google Console. If it only contains client credentials, run the browser-based verification once to mint long-lived refresh tokens.', dropboxDescription: 'Connect your Dropbox to sync files and folders from a chosen account.', + slackDescription: + 'Connect your Slack workspace to sync channel messages and threads.', + slackBotTokenTip: + 'Slack bot user OAuth token (starts with xoxb-). The app needs the channels:read, channels:history, and users:read scopes.', + slackChannelsTip: + 'Optional: channel names to sync (e.g., general). Leave empty to sync all accessible channels.', sharepointDescription: 'Connect a SharePoint site via Microsoft Graph to sync its document libraries.', sharepointSiteUrlTip: diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts index cd0c6709cb..9f799ffc3f 100644 --- a/web/src/locales/zh.ts +++ b/web/src/locales/zh.ts @@ -1099,6 +1099,11 @@ NER:使用 spaCy NER 和基于规则的关键词提取来抽取实体和关系 gmailTokenTip: '请上传由 Google Console 生成的 OAuth JSON。如果仅包含 client credentials,请通过浏览器授权一次以获取长期有效的刷新 Token。', dropboxDescription: '连接 Dropbox,同步指定账号下的文件与文件夹。', + slackDescription: '连接你的 Slack 工作区,同步频道消息与讨论串。', + slackBotTokenTip: + 'Slack 机器人用户 OAuth Token(以 xoxb- 开头)。应用需具备 channels:read、channels:history 和 users:read 权限。', + slackChannelsTip: + '可选:需要同步的频道名称(例如 general)。留空则同步所有可访问的频道。', sharepointDescription: '通过 Microsoft Graph 连接 SharePoint 站点,同步其文档库。', sharepointSiteUrlTip: '要索引的 SharePoint 站点完整 URL,例如 https://contoso.sharepoint.com/sites/MySite。需要具备 Sites.Read.All 与 Files.Read.All 应用权限(管理员同意)的 Azure AD 应用。', diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 026570eca3..587132c4a9 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -43,9 +43,9 @@ export enum DataSourceKey { POSTGRESQL = 'postgresql', REST_API = 'rest_api', RSS = 'rss', + SLACK = 'slack', SHAREPOINT = 'sharepoint', - // SLACK = 'slack', // TEAMS = 'teams', } @@ -130,6 +130,12 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.MOODLE]: { syncDeletedFiles: true, }, + [DataSourceKey.SLACK]: { + syncDeletedFiles: true, + }, + [DataSourceKey.SHAREPOINT]: { + syncDeletedFiles: true, + }, [DataSourceKey.MYSQL]: { syncDeletedFiles: true, }, @@ -213,6 +219,11 @@ export const generateDataSourceInfo = (t: TFunction) => { description: t(`setting.${DataSourceKey.MOODLE}Description`), icon: , }, + [DataSourceKey.SLACK]: { + name: 'Slack', + description: t(`setting.${DataSourceKey.SLACK}Description`), + icon: , + }, [DataSourceKey.SHAREPOINT]: { name: 'SharePoint', description: t(`setting.${DataSourceKey.SHAREPOINT}Description`), @@ -659,6 +670,22 @@ export const DataSourceFormFields = { required: true, }, ], + [DataSourceKey.SLACK]: [ + { + label: 'Slack Bot Token', + name: 'config.credentials.slack_bot_token', + type: FormFieldType.Password, + required: true, + tooltip: t('setting.slackBotTokenTip'), + }, + { + label: 'Channels', + name: 'config.channels', + type: FormFieldType.Tag, + required: false, + tooltip: t('setting.slackChannelsTip'), + }, + ], [DataSourceKey.SHAREPOINT]: [ { label: 'Site URL', @@ -1542,6 +1569,16 @@ export const DataSourceFormDefaultValues = { }, }, }, + [DataSourceKey.SLACK]: { + name: '', + source: DataSourceKey.SLACK, + config: { + channels: [], + credentials: { + slack_bot_token: '', + }, + }, + }, [DataSourceKey.SHAREPOINT]: { name: '', source: DataSourceKey.SHAREPOINT,