diff --git a/api/channels/bootstrap.py b/api/channels/bootstrap.py
index c55295a438..db86c2dc03 100644
--- a/api/channels/bootstrap.py
+++ b/api/channels/bootstrap.py
@@ -33,7 +33,7 @@ import threading
LOGGER = logging.getLogger(__name__)
# Channel packages bundled under api/channels that self-register on import.
-_BUNDLED_CHANNELS = ("feishu", "discord", "telegram", "line", "wecom", "qqbot")
+_BUNDLED_CHANNELS = ("feishu", "discord", "telegram", "line", "wecom", "qqbot", "dingtalk")
# How often (seconds) to reconcile running channels against the database.
_RECONCILE_INTERVAL_SECS = 10
diff --git a/api/channels/dingtalk/__init__.py b/api/channels/dingtalk/__init__.py
new file mode 100644
index 0000000000..08f075b2b2
--- /dev/null
+++ b/api/channels/dingtalk/__init__.py
@@ -0,0 +1,2 @@
+from .channel import _build # noqa: F401
+
diff --git a/api/channels/dingtalk/channel.py b/api/channels/dingtalk/channel.py
new file mode 100644
index 0000000000..e0394bc93d
--- /dev/null
+++ b/api/channels/dingtalk/channel.py
@@ -0,0 +1,466 @@
+from __future__ import annotations
+
+import hashlib
+import asyncio
+import json
+import logging
+import time
+from dataclasses import dataclass
+from typing import Any, Dict, Optional
+from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
+
+import aiohttp
+
+from ..core.base import Channel, IncomingMessage, OutgoingMessage
+from ..core.registry import register_channel
+
+LOGGER = logging.getLogger(__name__)
+
+DINGTALK_API_BASE = "https://api.dingtalk.com"
+DINGTALK_WS_FALLBACK = "wss://wss-open-connection.dingtalk.com:443/connect"
+DINGTALK_STREAM_TOPIC = "/v1.0/im/bot/messages/get"
+DINGTALK_MESSAGE_TTL_SECS = 3600
+
+
+@dataclass
+class DingTalkAccount:
+ account_id: str
+ client_id: str
+ client_secret: str
+
+
+class DingTalkChannel(Channel):
+ channel_id = "dingtalk"
+
+ def __init__(self, account: DingTalkAccount) -> None:
+ super().__init__()
+ self.account = account
+ self.account_id = account.account_id
+ self._stream_task: Optional[asyncio.Task] = None
+ self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
+ self._stop_requested = False
+ self._session_webhooks: Dict[str, str] = {}
+ self._processed_message_ids: Dict[str, float] = {}
+ self._inflight_message_ids: Dict[str, float] = {}
+
+ async def start(self) -> None:
+ self._stop_requested = False
+ if self._stream_task and not self._stream_task.done():
+ return
+ LOGGER.info(
+ "[dingtalk:%s] starting stream client (client_id=%s)",
+ self.account_id,
+ self._mask(self.account.client_id),
+ )
+ self._stream_task = asyncio.create_task(
+ self._run_stream(),
+ name=f"dingtalk-stream-{self.account_id}",
+ )
+
+ async def stop(self) -> None:
+ self._stop_requested = True
+ if self._ws is not None and not self._ws.closed:
+ try:
+ await self._ws.close()
+ except Exception:
+ LOGGER.debug("[dingtalk:%s] websocket close error", self.account_id, exc_info=True)
+ if self._stream_task and not self._stream_task.done():
+ self._stream_task.cancel()
+ try:
+ await self._stream_task
+ except BaseException:
+ pass
+ self._ws = None
+ self._stream_task = None
+ self._session_webhooks.clear()
+ self._processed_message_ids.clear()
+ self._inflight_message_ids.clear()
+
+ async def send(self, message: OutgoingMessage) -> None:
+ session_webhook = self._session_webhooks.get(message.chat_id)
+ if not session_webhook:
+ LOGGER.warning(
+ "[dingtalk:%s] no sessionWebhook cached for chat_id=%s; dropping reply",
+ self.account_id,
+ message.chat_id,
+ )
+ return
+
+ payload = {
+ "msgtype": "markdown",
+ "markdown": {
+ "title": "RAGFlow",
+ "text": message.text,
+ },
+ }
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(session_webhook, json=payload) as resp:
+ body = await resp.text()
+ if resp.status >= 400:
+ LOGGER.error(
+ "[dingtalk:%s] reply failed: status=%s body=%s",
+ self.account_id,
+ resp.status,
+ body[:500],
+ )
+ else:
+ LOGGER.debug(
+ "[dingtalk:%s] reply sent: status=%s chat_id=%s",
+ self.account_id,
+ resp.status,
+ message.chat_id,
+ )
+ except Exception:
+ LOGGER.error("[dingtalk:%s] send failed", self.account_id, exc_info=True)
+
+ async def _run_stream(self) -> None:
+ backoff = 3
+ while not self._stop_requested:
+ try:
+ self._session_webhooks.clear()
+ endpoint, ticket = await self._open_connection()
+ async with aiohttp.ClientSession() as session:
+ ws = await self._connect_websocket(session, endpoint, ticket)
+ self._ws = ws
+ LOGGER.info(
+ "[dingtalk:%s] websocket connected endpoint=%s",
+ self.account_id,
+ endpoint,
+ )
+ await self._subscribe(ws)
+ async for msg in ws:
+ if self._stop_requested:
+ break
+ try:
+ if msg.type == aiohttp.WSMsgType.TEXT:
+ await self._handle_ws_payload(msg.data)
+ elif msg.type == aiohttp.WSMsgType.BINARY:
+ await self._handle_ws_payload(
+ msg.data.decode("utf-8", "ignore")
+ )
+ elif msg.type in (
+ aiohttp.WSMsgType.CLOSE,
+ aiohttp.WSMsgType.CLOSED,
+ aiohttp.WSMsgType.ERROR,
+ ):
+ break
+ except Exception:
+ LOGGER.warning(
+ "[dingtalk:%s] dropping malformed websocket message",
+ self.account_id,
+ exc_info=True,
+ )
+ backoff = 3
+ except asyncio.CancelledError:
+ break
+ except Exception:
+ LOGGER.error("[dingtalk:%s] websocket loop error", self.account_id, exc_info=True)
+ finally:
+ if self._ws is not None and not self._ws.closed:
+ try:
+ await self._ws.close()
+ except Exception:
+ pass
+ self._ws = None
+
+ if not self._stop_requested:
+ await asyncio.sleep(backoff)
+ backoff = min(backoff * 2, 30)
+
+ async def _connect_websocket(
+ self,
+ session: aiohttp.ClientSession,
+ endpoint: str,
+ ticket: str,
+ ) -> aiohttp.ClientWebSocketResponse:
+ candidates = [
+ ("query", self._build_ws_url(endpoint, ticket), {}),
+ ("header", endpoint, {"ticket": ticket}),
+ ("bare", endpoint, {}),
+ ]
+ last_error: Exception | None = None
+ for mode, url, headers in candidates:
+ try:
+ LOGGER.info(
+ "[dingtalk:%s] trying websocket connect mode=%s url=%s",
+ self.account_id,
+ mode,
+ url,
+ )
+ return await session.ws_connect(url, heartbeat=30, headers=headers or None)
+ except Exception as exc:
+ last_error = exc
+ LOGGER.warning(
+ "[dingtalk:%s] websocket connect failed mode=%s: %s",
+ self.account_id,
+ mode,
+ exc,
+ )
+ if last_error is not None:
+ raise last_error
+ raise RuntimeError("websocket connect failed")
+
+ async def _open_connection(self) -> tuple[str, str]:
+ payload = {
+ "clientId": self.account.client_id,
+ "clientSecret": self.account.client_secret,
+ "subscriptions": [
+ {
+ "type": "CALLBACK",
+ "topic": DINGTALK_STREAM_TOPIC,
+ }
+ ],
+ }
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ f"{DINGTALK_API_BASE}/v1.0/gateway/connections/open",
+ json=payload,
+ headers={"Accept": "application/json"},
+ ) as resp:
+ text = await resp.text()
+ if resp.status != 200:
+ raise RuntimeError(f"status: {resp.status}, response: {text}")
+ try:
+ data = json.loads(text)
+ except json.JSONDecodeError as exc:
+ raise RuntimeError(f"invalid open response: {text[:500]}") from exc
+
+ endpoint = str(data.get("endpoint") or "").strip() or DINGTALK_WS_FALLBACK
+ ticket = str(data.get("ticket") or "").strip()
+ if not ticket:
+ raise RuntimeError(f"connections/open response missing ticket: {text[:500]}")
+ LOGGER.info(
+ "[dingtalk:%s] connections/open ok endpoint=%s ticket=%s",
+ self.account_id,
+ endpoint,
+ self._mask(ticket),
+ )
+ return endpoint, ticket
+
+ async def _subscribe(self, ws: aiohttp.ClientWebSocketResponse) -> None:
+ # DingTalk Stream uses the `connections/open` ticket to authorize the
+ # websocket connection. We keep this hook for future protocol-specific
+ # frames, but the current minimal integration does not emit anything.
+ _ = ws
+
+ async def _handle_ws_payload(self, payload: str) -> None:
+ if not payload:
+ return
+ try:
+ obj = json.loads(payload)
+ except json.JSONDecodeError:
+ LOGGER.error(
+ "[dingtalk:%s] invalid websocket payload: %r",
+ self.account_id,
+ payload[:200],
+ )
+ return
+
+ data: Any = obj
+ headers = obj.get("headers") if isinstance(obj, dict) else {}
+ if not isinstance(headers, dict):
+ headers = {}
+ if isinstance(obj, dict):
+ if isinstance(obj.get("data"), str):
+ try:
+ data = json.loads(obj["data"])
+ except json.JSONDecodeError:
+ data = obj["data"]
+ elif isinstance(obj.get("data"), dict):
+ data = obj["data"]
+
+ if not isinstance(data, dict):
+ return
+
+ session_webhook = str(data.get("sessionWebhook") or obj.get("sessionWebhook") or "").strip()
+ callback_message_id = str(
+ headers.get("messageId")
+ or obj.get("messageId")
+ or data.get("messageId")
+ or data.get("msgId")
+ or obj.get("msgId")
+ or ""
+ ).strip()
+ chat_id = str(
+ data.get("conversationId")
+ or data.get("chatId")
+ or data.get("openConversationId")
+ or data.get("msgId")
+ or ""
+ ).strip()
+ sender_id = str(
+ data.get("senderId")
+ or data.get("senderStaffId")
+ or data.get("userId")
+ or ""
+ ).strip()
+ message_id = str(data.get("msgId") or obj.get("msgId") or "").strip()
+ chat_type = str(data.get("conversationType") or "").strip()
+ text = self._extract_text(data)
+ dedup_key = self._build_dedup_key(data, obj)
+
+ if session_webhook and chat_id:
+ self._session_webhooks[chat_id] = session_webhook
+
+ if not text.strip() or not chat_id or not sender_id:
+ return
+
+ await self._ack_callback(callback_message_id, data)
+ self._prune_message_cache()
+ if dedup_key:
+ if dedup_key in self._processed_message_ids:
+ LOGGER.debug(
+ "[dingtalk:%s] skipping processed duplicate message=%s",
+ self.account_id,
+ dedup_key,
+ )
+ return
+ if dedup_key in self._inflight_message_ids:
+ LOGGER.debug(
+ "[dingtalk:%s] skipping inflight duplicate message=%s",
+ self.account_id,
+ dedup_key,
+ )
+ return
+ self._inflight_message_ids[dedup_key] = time.time()
+
+ try:
+ incoming = IncomingMessage(
+ channel=self.channel_id,
+ account_id=self.account_id,
+ chat_id=chat_id,
+ chat_type=chat_type,
+ message_id=message_id or dedup_key or "",
+ sender_id=sender_id,
+ text=text,
+ raw=data,
+ )
+ await self._dispatch(incoming)
+ finally:
+ if dedup_key:
+ self._inflight_message_ids.pop(dedup_key, None)
+ self._processed_message_ids[dedup_key] = time.time()
+
+ async def _ack_callback(self, message_id: str, data: Dict[str, Any]) -> None:
+ if not message_id or self._ws is None or self._ws.closed:
+ return
+ payload = {
+ "messageId": message_id,
+ "response": {"success": True},
+ }
+ try:
+ await self._ws.send_json(payload)
+ LOGGER.debug(
+ "[dingtalk:%s] acked callback messageId=%s msgId=%s",
+ self.account_id,
+ self._mask(message_id),
+ self._mask(str(data.get("msgId") or "")),
+ )
+ except Exception:
+ LOGGER.warning(
+ "[dingtalk:%s] failed to ack callback messageId=%s",
+ self.account_id,
+ self._mask(message_id),
+ exc_info=True,
+ )
+
+ @staticmethod
+ def _extract_text(data: Dict[str, Any]) -> str:
+ text = data.get("text")
+ if isinstance(text, dict):
+ content = text.get("content")
+ if isinstance(content, str):
+ return content
+ if isinstance(text, str):
+ return text
+ content = data.get("content")
+ if isinstance(content, dict):
+ for key in ("text", "content", "recognition"):
+ value = content.get(key)
+ if isinstance(value, str) and value.strip():
+ return value
+ if isinstance(content, str):
+ return content
+ return ""
+
+ @staticmethod
+ def _build_ws_url(endpoint: str, ticket: str) -> str:
+ parts = urlsplit(endpoint)
+ query = dict(parse_qsl(parts.query, keep_blank_values=True))
+ query.setdefault("ticket", ticket)
+ return urlunsplit(
+ (parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment)
+ )
+
+ def _build_dedup_key(self, data: Dict[str, Any], obj: Dict[str, Any]) -> str:
+ message_id = str(data.get("msgId") or obj.get("msgId") or "").strip()
+ if message_id:
+ return f"msg:{message_id}"
+
+ callback_message_id = str(
+ obj.get("headers", {}).get("messageId")
+ or obj.get("messageId")
+ or data.get("messageId")
+ or ""
+ ).strip()
+ if callback_message_id:
+ return f"callback:{callback_message_id}"
+
+ conversation_id = str(
+ data.get("conversationId")
+ or data.get("chatId")
+ or data.get("openConversationId")
+ or ""
+ ).strip()
+ sender_id = str(
+ data.get("senderId")
+ or data.get("senderStaffId")
+ or data.get("userId")
+ or ""
+ ).strip()
+ text = self._extract_text(data).strip()
+ event_ts = str(
+ data.get("eventTime")
+ or obj.get("eventTime")
+ or data.get("timestamp")
+ or obj.get("timestamp")
+ or ""
+ ).strip()
+ if conversation_id and sender_id and text:
+ digest = hashlib.sha1(text.encode("utf-8")).hexdigest()[:16]
+ suffix = f":{event_ts}" if event_ts else ""
+ return f"fallback:{conversation_id}:{sender_id}{suffix}:{digest}"
+ return ""
+
+ def _prune_message_cache(self) -> None:
+ now = time.time()
+ cutoff = now - DINGTALK_MESSAGE_TTL_SECS
+ for cache in (self._processed_message_ids, self._inflight_message_ids):
+ stale = [key for key, ts in cache.items() if ts < cutoff]
+ for key in stale:
+ cache.pop(key, None)
+
+ @staticmethod
+ def _mask(value: str) -> str:
+ if len(value) <= 8:
+ return "***"
+ return f"{value[:4]}...{value[-4:]}"
+
+
+def _build(account_id: str, cfg: dict) -> Channel:
+ credential = cfg.get("credential") or cfg
+ client_id = credential.get("client_id") or credential.get("clientId")
+ client_secret = credential.get("client_secret") or credential.get("clientSecret")
+ if not client_id or not client_secret:
+ raise ValueError(f"dingtalk account '{account_id}' is missing client_id or client_secret")
+ return DingTalkChannel(
+ DingTalkAccount(
+ account_id=account_id,
+ client_id=str(client_id),
+ client_secret=str(client_secret),
+ )
+ )
+
+
+register_channel("dingtalk", _build)
diff --git a/web/src/assets/svg/chat-channel/dingtalk.svg b/web/src/assets/svg/chat-channel/dingtalk.svg
new file mode 100644
index 0000000000..ff2d53516f
--- /dev/null
+++ b/web/src/assets/svg/chat-channel/dingtalk.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts
index b11faf62b1..2d6df5b729 100644
--- a/web/src/locales/en.ts
+++ b/web/src/locales/en.ts
@@ -1497,6 +1497,7 @@ Example: Virtual Hosted Style`,
chatChannelDesc: {
clickclack: 'Connect a ClickClack bot',
discord: 'Connect a Discord bot',
+ dingtalk: 'Connect a DingTalk bot',
feishu: 'Connect a Feishu / Lark bot',
googlechat: 'Connect a Google Chat bot',
irc: 'Connect to an IRC server',
diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts
index 4714ca1fce..f2fc3b4a9f 100644
--- a/web/src/locales/zh.ts
+++ b/web/src/locales/zh.ts
@@ -1226,6 +1226,7 @@ NER:使用 spaCy NER 和基于规则的关键词提取来抽取实体和关系
chatChannelDesc: {
clickclack: '连接 ClickClack 机器人',
discord: '连接 Discord 机器人',
+ dingtalk: '连接钉钉机器人',
feishu: '连接飞书 / Lark 机器人',
googlechat: '连接 Google Chat 机器人',
irc: '连接 IRC 服务器',
diff --git a/web/src/pages/user-setting/chat-channel/constant/index.tsx b/web/src/pages/user-setting/chat-channel/constant/index.tsx
index 623baa4bec..2bc697bf64 100644
--- a/web/src/pages/user-setting/chat-channel/constant/index.tsx
+++ b/web/src/pages/user-setting/chat-channel/constant/index.tsx
@@ -8,6 +8,7 @@ import { IChatChannelInfoMap } from '../interface';
export enum ChatChannelKey {
CLICKCLACK = 'clickclack',
DISCORD = 'discord',
+ DINGTALK = 'dingtalk',
FEISHU = 'feishu',
GOOGLECHAT = 'googlechat',
IRC = 'irc',
@@ -48,6 +49,7 @@ const channelIcon = (key: ChatChannelKey) => (
const CHANNEL_NAMES: Record = {
[ChatChannelKey.CLICKCLACK]: 'ClickClack',
[ChatChannelKey.DISCORD]: 'Discord',
+ [ChatChannelKey.DINGTALK]: 'DingTalk',
[ChatChannelKey.FEISHU]: 'Feishu / Lark',
[ChatChannelKey.GOOGLECHAT]: 'Google Chat',
[ChatChannelKey.IRC]: 'IRC',
@@ -160,6 +162,21 @@ export const ChatChannelFormFields: Record =
placeholder: '1234567890',
},
],
+ [ChatChannelKey.DINGTALK]: [
+ {
+ label: 'Client ID',
+ name: 'config.credential.client_id',
+ type: FormFieldType.Text,
+ required: true,
+ placeholder: 'dingxxxxxxxxxxxx',
+ },
+ {
+ label: 'Client Secret',
+ name: 'config.credential.client_secret',
+ type: FormFieldType.Password,
+ required: true,
+ },
+ ],
[ChatChannelKey.FEISHU]: [
{
label: 'App ID',
diff --git a/web/src/pages/user-setting/chat-channel/index.tsx b/web/src/pages/user-setting/chat-channel/index.tsx
index 71b12ebc8f..a530642e3d 100644
--- a/web/src/pages/user-setting/chat-channel/index.tsx
+++ b/web/src/pages/user-setting/chat-channel/index.tsx
@@ -61,6 +61,7 @@ const ChatChannel = () => {
(id) =>
[
ChatChannelKey.DISCORD,
+ ChatChannelKey.DINGTALK,
ChatChannelKey.FEISHU,
ChatChannelKey.TELEGRAM,
ChatChannelKey.QQBOT,