diff --git a/common/data_source/slack_connector.py b/common/data_source/slack_connector.py
index 162826762c..441d4b6e9f 100644
--- a/common/data_source/slack_connector.py
+++ b/common/data_source/slack_connector.py
@@ -37,7 +37,6 @@ from common.data_source.models import (
Document,
DocumentFailure,
SlimDocument,
- TextSection,
SecondsSinceUnixEpoch,
GenerateSlimDocumentOutput, MessageType, SlackMessageFilterReason, ChannelType, ThreadType, ProcessedSlackMessage,
CheckpointOutput
@@ -201,7 +200,10 @@ def thread_to_doc(
]
valid_experts = [expert for expert in experts if expert]
- first_message = slack_cleaner.index_clean(cast(str, thread[0]["text"]))
+ cleaned_messages = [
+ slack_cleaner.index_clean(cast(str, m["text"])) for m in thread
+ ]
+ first_message = cleaned_messages[0] if cleaned_messages else ""
snippet = (
first_message[:50].rstrip() + "..."
if len(first_message) > 50
@@ -212,21 +214,22 @@ def thread_to_doc(
"\n", " "
)
+ # The Document model is blob-based (no sections), so flatten the thread's
+ # cleaned messages into a single UTF-8 text blob.
+ content = "\n\n".join(cleaned_messages)
+ blob = content.encode("utf-8")
+
return Document(
id=_build_doc_id(channel_id=channel_id, thread_ts=thread[0]["ts"]),
- sections=[
- TextSection(
- link=get_message_link(event=m, client=client, channel_id=channel_id),
- text=slack_cleaner.index_clean(cast(str, m["text"])),
- )
- for m in thread
- ],
source="slack",
semantic_identifier=doc_sem_id,
+ extension=".txt",
+ blob=blob,
+ size_bytes=len(blob),
doc_updated_at=get_latest_message_time(thread),
primary_owners=valid_experts,
metadata={"Channel": channel["name"]},
- external_access=channel_access,
+ externale_access=channel_access,
)
@@ -540,6 +543,79 @@ class SlackConnector(
callback=callback,
)
+ def _fetch_document_batches(
+ self,
+ oldest: str | None = None,
+ latest: str | None = None,
+ callback: Any = None,
+ ) -> Generator[list[Document], None, None]:
+ """Iterate the configured channels and yield batches of thread documents.
+
+ The checkpoint interface is not implemented in this connector, so both
+ full and incremental syncs run through this generator. ``oldest`` /
+ ``latest`` are Slack epoch-second strings used to bound the
+ conversations history for incremental polling.
+ """
+ if self.client is None or self.text_cleaner is None:
+ raise ConnectorMissingCredentialError("Slack")
+
+ all_channels = get_channels(self.client)
+ filtered_channels = filter_channels(
+ all_channels, self.channels, self.channel_regex_enabled
+ )
+
+ batch: list[Document] = []
+ for channel in filtered_channels:
+ seen_thread_ts: set[str] = set()
+ for message_batch in get_channel_messages(
+ client=self.client,
+ channel=channel,
+ oldest=oldest,
+ latest=latest,
+ callback=callback,
+ ):
+ for message in message_batch:
+ processed = _process_message(
+ message=message,
+ client=self.client,
+ channel=channel,
+ slack_cleaner=self.text_cleaner,
+ user_cache=self.user_cache,
+ seen_thread_ts=seen_thread_ts,
+ channel_access=None,
+ )
+
+ if processed.thread_or_message_ts:
+ seen_thread_ts.add(processed.thread_or_message_ts)
+
+ if processed.failure is not None:
+ logging.warning(
+ "Slack message processing failure: %s",
+ processed.failure.failure_message,
+ )
+ continue
+
+ if processed.doc is not None:
+ batch.append(processed.doc)
+ if len(batch) >= self.batch_size:
+ yield batch
+ batch = []
+
+ if batch:
+ yield batch
+
+ def load_from_state(self) -> Generator[list[Document], None, None]:
+ """Full sync: ingest every accessible channel message/thread."""
+ return self._fetch_document_batches()
+
+ def poll_source(
+ self,
+ start: SecondsSinceUnixEpoch,
+ end: SecondsSinceUnixEpoch,
+ ) -> Generator[list[Document], None, None]:
+ """Incremental sync bounded by a [start, end] epoch-seconds window."""
+ return self._fetch_document_batches(oldest=str(start), latest=str(end))
+
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
@@ -602,6 +678,16 @@ class SlackConnector(
f"Slack API returned a failure: {error_msg}"
)
+ # 3) Confirm users:read scope is available (required by thread_to_doc)
+ users_resp = self.fast_client.users_info(user="USLACKBOT")
+ if not users_resp.get("ok", False):
+ error_msg = users_resp.get("error", "")
+ if error_msg in ("missing_scope", "not_allowed_token_type"):
+ raise InsufficientPermissionsError(
+ "Slack bot token lacks the 'users:read' scope required to look up message senders. "
+ "Please add 'users:read' to your Slack app's OAuth scopes."
+ )
+
except SlackApiError as e:
slack_error = e.response.get("error", "")
if slack_error == "ratelimited":
diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py
index 3b01014327..76ba60a3fe 100644
--- a/rag/svr/sync_data_source.py
+++ b/rag/svr/sync_data_source.py
@@ -61,6 +61,7 @@ from common.data_source import (
RDBMSConnector,
DingTalkAITableConnector,
RestAPIConnector,
+ SlackConnector,
SharePointConnector,
)
from common.data_source.models import ConnectorFailure, SeafileSyncScope
@@ -999,7 +1000,58 @@ class Slack(SyncBase):
SOURCE_NAME: str = FileSource.SLACK
async def _generate(self, task: dict):
- pass
+ from common.data_source.config import DocumentSource
+ from common.data_source.interfaces import StaticCredentialsProvider
+
+ channels_conf = self.conf.get("channels")
+ if isinstance(channels_conf, str):
+ channels = [c.strip() for c in channels_conf.split(",") if c.strip()]
+ elif isinstance(channels_conf, list):
+ channels = [str(c).strip() for c in channels_conf if str(c).strip()]
+ else:
+ channels = None
+
+ raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
+ try:
+ batch_size = int(raw_batch_size)
+ except (TypeError, ValueError):
+ batch_size = INDEX_BATCH_SIZE
+ if batch_size <= 0:
+ batch_size = INDEX_BATCH_SIZE
+
+ self.connector = SlackConnector(
+ channels=channels or None,
+ channel_regex_enabled=bool(self.conf.get("channel_regex_enabled", False)),
+ batch_size=batch_size,
+ )
+
+ credentials = self.conf.get("credentials") or {}
+ if not credentials.get("slack_bot_token"):
+ raise ValueError("Slack connector is missing the bot token credential.")
+
+ credentials_provider = StaticCredentialsProvider(
+ tenant_id=task["tenant_id"],
+ connector_name=DocumentSource.SLACK,
+ credential_json=credentials,
+ )
+ self.connector.set_credentials_provider(credentials_provider)
+ self.connector.validate_connector_settings()
+
+ poll_start = task["poll_range_start"]
+ if task["reindex"] == "1" or not poll_start:
+ document_generator = self.connector.load_from_state()
+ _begin_info = "totally"
+ else:
+ end_time = datetime.now(timezone.utc).timestamp()
+ document_generator = self.connector.poll_source(poll_start.timestamp(), end_time)
+ _begin_info = f"from {poll_start}"
+
+ self.log_connection(
+ "Slack",
+ f"channels({', '.join(channels) if channels else 'all'})",
+ task,
+ )
+ return document_generator
class Teams(SyncBase):
diff --git a/test/testcases/restful_api/test_documents.py b/test/testcases/restful_api/test_documents.py
index 05a1743d0c..e0084c13b5 100644
--- a/test/testcases/restful_api/test_documents.py
+++ b/test/testcases/restful_api/test_documents.py
@@ -64,16 +64,19 @@ def test_documents_upload_and_list(rest_client, create_dataset, tmp_path):
assert any(doc["name"] == fp.name for doc in list_payload["data"]["docs"]), list_payload
-def _upload_files(rest_client, dataset_id, file_paths):
+def _upload_files(rest_client, dataset_id, file_paths, timeout=None):
with ExitStack() as stack:
files = [("file", (fp.name, stack.enter_context(fp.open("rb")))) for fp in file_paths]
- return rest_client.post(f"/datasets/{dataset_id}/documents", files=files)
+ kwargs = {"files": files}
+ if timeout is not None:
+ kwargs["timeout"] = timeout
+ return rest_client.post(f"/datasets/{dataset_id}/documents", **kwargs)
-def _seed_documents(rest_client, create_dataset, tmp_path, count=5):
+def _seed_documents(rest_client, create_dataset, tmp_path, count=5, timeout=None):
dataset_id = create_dataset("dataset_list_contract")
file_paths = [create_txt_file(tmp_path / f"ragflow_test_upload_{i}.txt") for i in range(count)]
- res = _upload_files(rest_client, dataset_id, file_paths)
+ res = _upload_files(rest_client, dataset_id, file_paths, timeout=timeout)
assert res.status_code == 200
payload = res.json()
assert payload["code"] == 0, payload
@@ -1166,7 +1169,9 @@ def test_documents_delete_invalid_dataset_partial_duplicate_repeat_and_cross_dat
@pytest.mark.p2
def test_documents_delete_concurrent_and_bulk_contract(rest_client, create_dataset, tmp_path):
- dataset_id, uploaded_docs = _seed_documents(rest_client, create_dataset, tmp_path, count=60)
+ dataset_id, uploaded_docs = _seed_documents(
+ rest_client, create_dataset, tmp_path, count=60, timeout=120
+ )
document_ids = [doc["id"] for doc in uploaded_docs]
with ThreadPoolExecutor(max_workers=8) as executor:
@@ -1192,9 +1197,15 @@ def test_documents_delete_concurrent_and_bulk_contract(rest_client, create_datas
assert list_after_payload["code"] == 0, list_after_payload
assert list_after_payload["data"]["total"] == 0, list_after_payload
- bulk_dataset_id, bulk_docs = _seed_documents(rest_client, create_dataset, tmp_path, count=120)
+ bulk_dataset_id, bulk_docs = _seed_documents(
+ rest_client, create_dataset, tmp_path, count=120, timeout=120
+ )
bulk_ids = [doc["id"] for doc in bulk_docs]
- bulk_delete_res = rest_client.delete(f"/datasets/{bulk_dataset_id}/documents", json={"ids": bulk_ids})
+ bulk_delete_res = rest_client.delete(
+ f"/datasets/{bulk_dataset_id}/documents",
+ json={"ids": bulk_ids},
+ timeout=120,
+ )
assert bulk_delete_res.status_code == 200
bulk_delete_payload = bulk_delete_res.json()
assert bulk_delete_payload["code"] == 0, bulk_delete_payload
diff --git a/test/unit_test/data_source/test_slack_connector_unit.py b/test/unit_test/data_source/test_slack_connector_unit.py
new file mode 100644
index 0000000000..56a9551e65
--- /dev/null
+++ b/test/unit_test/data_source/test_slack_connector_unit.py
@@ -0,0 +1,206 @@
+#
+# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import importlib.util
+import sys
+from pathlib import Path
+from types import ModuleType
+
+import pytest
+
+
+def _load_slack_connector_module():
+ """Load slack_connector.py in isolation.
+
+ Importing ``common.data_source`` directly would execute its ``__init__``
+ and pull in every connector's (heavy) dependencies. We stub the package and
+ exec only the Slack module, mirroring the Dropbox connector unit test.
+ """
+ repo_root = Path(__file__).resolve().parents[3]
+ package_name = "common.data_source"
+ saved_modules = {
+ name: module
+ for name, module in sys.modules.items()
+ if name == package_name or name.startswith(f"{package_name}.")
+ }
+ package_stub = ModuleType(package_name)
+ package_stub.__path__ = [str(repo_root / "common" / "data_source")]
+ sys.modules[package_name] = package_stub
+
+ try:
+ spec = importlib.util.spec_from_file_location(
+ "_slack_connector_under_test",
+ repo_root / "common" / "data_source" / "slack_connector.py",
+ )
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ return module
+ finally:
+ for name in list(sys.modules):
+ if name == package_name or name.startswith(f"{package_name}."):
+ if name in saved_modules:
+ sys.modules[name] = saved_modules[name]
+ else:
+ sys.modules.pop(name, None)
+
+
+slack_connector = _load_slack_connector_module()
+SlackConnector = slack_connector.SlackConnector
+SlackTextCleaner = slack_connector.SlackTextCleaner
+
+
+class _FakeResponse(dict):
+ """Mimics slack_sdk SlackResponse: dict-like, plus .validate() and .data."""
+
+ def validate(self):
+ return self
+
+ @property
+ def data(self):
+ return self
+
+
+class _FakeSlackClient:
+ def __init__(self):
+ self.token = "xoxb-test"
+ self.joined = []
+ self._users = {
+ "U1": {
+ "real_name": "Alice",
+ "profile": {"display_name": "alice", "first_name": "Alice", "last_name": "A"},
+ },
+ "U2": {"real_name": "Bob", "profile": {"display_name": "bob"}},
+ "U3": {"real_name": "Carol", "profile": {"display_name": "carol"}},
+ }
+
+ def conversations_list(self, cursor=None, limit=None, **kwargs):
+ return _FakeResponse(
+ {
+ "channels": [
+ {"id": "C1", "name": "general", "is_member": True, "is_private": False},
+ ],
+ "response_metadata": {"next_cursor": ""},
+ }
+ )
+
+ def conversations_history(self, cursor=None, limit=None, channel=None, oldest=None, latest=None, **kwargs):
+ return _FakeResponse(
+ {
+ "messages": [
+ {"ts": "1.0", "user": "U1", "text": "Hello world"},
+ {"ts": "2.0", "user": "U2", "text": "Question?", "thread_ts": "2.0"},
+ ],
+ "response_metadata": {"next_cursor": ""},
+ }
+ )
+
+ def conversations_replies(self, cursor=None, limit=None, channel=None, ts=None, **kwargs):
+ return _FakeResponse(
+ {
+ "messages": [
+ {"ts": "2.0", "user": "U2", "text": "Question?", "thread_ts": "2.0"},
+ {"ts": "3.0", "user": "U3", "text": "Answer!", "thread_ts": "2.0"},
+ ],
+ "response_metadata": {"next_cursor": ""},
+ }
+ )
+
+ def users_info(self, user=None, **kwargs):
+ return _FakeResponse({"ok": True, "user": self._users.get(user, {})})
+
+
+def _connector_with_fake_client(client, batch_size=10):
+ connector = SlackConnector(batch_size=batch_size)
+ connector.client = client
+ connector.text_cleaner = SlackTextCleaner(client=client)
+ return connector
+
+
+# --- credential loading -----------------------------------------------------
+
+
+@pytest.mark.p2
+def test_load_credentials_is_not_supported():
+ connector = SlackConnector()
+ with pytest.raises(NotImplementedError):
+ connector.load_credentials({"slack_bot_token": "xoxb-abc"})
+
+
+@pytest.mark.p2
+def test_set_credentials_provider_initializes_clients():
+ connector = SlackConnector()
+
+ class _Provider:
+ def get_credentials(self):
+ return {"slack_bot_token": "xoxb-abc"}
+
+ connector.set_credentials_provider(_Provider())
+
+ assert connector.client is not None
+ assert connector.fast_client is not None
+ assert connector.text_cleaner is not None
+
+
+@pytest.mark.p2
+def test_fetch_without_credentials_raises():
+ connector = SlackConnector()
+ with pytest.raises(slack_connector.ConnectorMissingCredentialError):
+ list(connector.load_from_state())
+
+
+# --- document generation ----------------------------------------------------
+
+
+@pytest.mark.p1
+def test_load_from_state_generates_thread_documents():
+ connector = _connector_with_fake_client(_FakeSlackClient())
+
+ batches = list(connector.load_from_state())
+ docs = [doc for batch in batches for doc in batch]
+
+ # Standalone message + one thread (parent + reply collapsed into one doc).
+ assert [doc.id for doc in docs] == ["C1__1.0", "C1__2.0"]
+
+ standalone, thread_doc = docs
+ assert standalone.source == "slack"
+ assert standalone.extension == ".txt"
+ assert standalone.blob == b"Hello world"
+ assert standalone.size_bytes == len(b"Hello world")
+ assert standalone.metadata == {"Channel": "general"}
+ # get_semantic_name() prefers real_name ("Alice") over the display_name.
+ assert standalone.semantic_identifier == "Alice in #general: Hello world"
+
+ # Thread messages are flattened into a single blob, joined by blank lines.
+ assert thread_doc.blob == "Question?\n\nAnswer!".encode("utf-8")
+ assert thread_doc.size_bytes == len(thread_doc.blob)
+ assert thread_doc.semantic_identifier == "Bob in #general: Question?"
+
+
+@pytest.mark.p1
+def test_poll_source_passes_time_window():
+ client = _FakeSlackClient()
+ captured = {}
+
+ def _history(cursor=None, limit=None, channel=None, oldest=None, latest=None, **kwargs):
+ captured["oldest"] = oldest
+ captured["latest"] = latest
+ return _FakeResponse({"messages": [], "response_metadata": {"next_cursor": ""}})
+
+ client.conversations_history = _history
+ connector = _connector_with_fake_client(client)
+
+ list(connector.poll_source(100.0, 200.0))
+
+ assert captured == {"oldest": "100.0", "latest": "200.0"}
diff --git a/web/src/assets/svg/data-source/slack.svg b/web/src/assets/svg/data-source/slack.svg
new file mode 100644
index 0000000000..72d1110062
--- /dev/null
+++ b/web/src/assets/svg/data-source/slack.svg
@@ -0,0 +1,6 @@
+
diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts
index 1f8f1fe117..86e8986a37 100644
--- a/web/src/locales/en.ts
+++ b/web/src/locales/en.ts
@@ -1238,6 +1238,12 @@ Example: Virtual Hosted Style`,
'Upload the OAuth JSON generated from Google Console. If it only contains client credentials, run the browser-based verification once to mint long-lived refresh tokens.',
dropboxDescription:
'Connect your Dropbox to sync files and folders from a chosen account.',
+ slackDescription:
+ 'Connect your Slack workspace to sync channel messages and threads.',
+ slackBotTokenTip:
+ 'Slack bot user OAuth token (starts with xoxb-). The app needs the channels:read, channels:history, and users:read scopes.',
+ slackChannelsTip:
+ 'Optional: channel names to sync (e.g., general). Leave empty to sync all accessible channels.',
sharepointDescription:
'Connect a SharePoint site via Microsoft Graph to sync its document libraries.',
sharepointSiteUrlTip:
diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts
index cd0c6709cb..9f799ffc3f 100644
--- a/web/src/locales/zh.ts
+++ b/web/src/locales/zh.ts
@@ -1099,6 +1099,11 @@ NER:使用 spaCy NER 和基于规则的关键词提取来抽取实体和关系
gmailTokenTip:
'请上传由 Google Console 生成的 OAuth JSON。如果仅包含 client credentials,请通过浏览器授权一次以获取长期有效的刷新 Token。',
dropboxDescription: '连接 Dropbox,同步指定账号下的文件与文件夹。',
+ slackDescription: '连接你的 Slack 工作区,同步频道消息与讨论串。',
+ slackBotTokenTip:
+ 'Slack 机器人用户 OAuth Token(以 xoxb- 开头)。应用需具备 channels:read、channels:history 和 users:read 权限。',
+ slackChannelsTip:
+ '可选:需要同步的频道名称(例如 general)。留空则同步所有可访问的频道。',
sharepointDescription: '通过 Microsoft Graph 连接 SharePoint 站点,同步其文档库。',
sharepointSiteUrlTip:
'要索引的 SharePoint 站点完整 URL,例如 https://contoso.sharepoint.com/sites/MySite。需要具备 Sites.Read.All 与 Files.Read.All 应用权限(管理员同意)的 Azure AD 应用。',
diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx
index 026570eca3..587132c4a9 100644
--- a/web/src/pages/user-setting/data-source/constant/index.tsx
+++ b/web/src/pages/user-setting/data-source/constant/index.tsx
@@ -43,9 +43,9 @@ export enum DataSourceKey {
POSTGRESQL = 'postgresql',
REST_API = 'rest_api',
RSS = 'rss',
+ SLACK = 'slack',
SHAREPOINT = 'sharepoint',
- // SLACK = 'slack',
// TEAMS = 'teams',
}
@@ -130,6 +130,12 @@ export const DataSourceFeatureVisibilityMap: Partial<
[DataSourceKey.MOODLE]: {
syncDeletedFiles: true,
},
+ [DataSourceKey.SLACK]: {
+ syncDeletedFiles: true,
+ },
+ [DataSourceKey.SHAREPOINT]: {
+ syncDeletedFiles: true,
+ },
[DataSourceKey.MYSQL]: {
syncDeletedFiles: true,
},
@@ -213,6 +219,11 @@ export const generateDataSourceInfo = (t: TFunction) => {
description: t(`setting.${DataSourceKey.MOODLE}Description`),
icon: ,
},
+ [DataSourceKey.SLACK]: {
+ name: 'Slack',
+ description: t(`setting.${DataSourceKey.SLACK}Description`),
+ icon: ,
+ },
[DataSourceKey.SHAREPOINT]: {
name: 'SharePoint',
description: t(`setting.${DataSourceKey.SHAREPOINT}Description`),
@@ -659,6 +670,22 @@ export const DataSourceFormFields = {
required: true,
},
],
+ [DataSourceKey.SLACK]: [
+ {
+ label: 'Slack Bot Token',
+ name: 'config.credentials.slack_bot_token',
+ type: FormFieldType.Password,
+ required: true,
+ tooltip: t('setting.slackBotTokenTip'),
+ },
+ {
+ label: 'Channels',
+ name: 'config.channels',
+ type: FormFieldType.Tag,
+ required: false,
+ tooltip: t('setting.slackChannelsTip'),
+ },
+ ],
[DataSourceKey.SHAREPOINT]: [
{
label: 'Site URL',
@@ -1542,6 +1569,16 @@ export const DataSourceFormDefaultValues = {
},
},
},
+ [DataSourceKey.SLACK]: {
+ name: '',
+ source: DataSourceKey.SLACK,
+ config: {
+ channels: [],
+ credentials: {
+ slack_bot_token: '',
+ },
+ },
+ },
[DataSourceKey.SHAREPOINT]: {
name: '',
source: DataSourceKey.SHAREPOINT,