From ea70663f095130e52b23358cbcf401469efe79e3 Mon Sep 17 00:00:00 2001 From: buua436 Date: Thu, 18 Jun 2026 13:10:09 +0800 Subject: [PATCH] feat: support wecom websocket channel (#16175) Added WeCom chat channel websocket mode alongside the existing webhook mode, plus frontend support for selecting the connection type. --- api/channels/wecom/channel.py | 370 ++++++++++++++++-- web/src/assets/svg/chat-channel/wecom.svg | 2 +- web/src/assets/svg/chat-channel/wexin.svg | 1 + .../chat-channel/constant/index.tsx | 44 ++- .../pages/user-setting/chat-channel/index.tsx | 1 + 5 files changed, 377 insertions(+), 41 deletions(-) create mode 100644 web/src/assets/svg/chat-channel/wexin.svg diff --git a/api/channels/wecom/channel.py b/api/channels/wecom/channel.py index 0d56d466db..4ad04e1f20 100644 --- a/api/channels/wecom/channel.py +++ b/api/channels/wecom/channel.py @@ -1,10 +1,11 @@ from __future__ import annotations import asyncio +import json import logging import time from dataclasses import dataclass -from typing import Dict, Optional, Tuple +from typing import Any, Dict, Optional, Tuple import aiohttp from aiohttp import web @@ -18,16 +19,19 @@ from ..core.registry import register_channel LOGGER = logging.getLogger(__name__) WECOM_API_BASE = "https://qyapi.weixin.qq.com/cgi-bin" +WECOM_WS_URL = "wss://openws.work.weixin.qq.com" @dataclass class WeComAccount: account_id: str - corp_id: str - agent_id: int - secret: str - token: str - aes_key: str + connection_type: str = "webhook" + corp_id: str = "" + agent_id: int = 0 + secret: str = "" + token: str = "" + aes_key: str = "" + bot_id: str = "" webhook_host: str = "0.0.0.0" webhook_port: int = 3002 @@ -143,15 +147,39 @@ class WeComChannel(Channel): super().__init__() self.account = account self.account_id = account.account_id - self.crypto = WeChatCrypto( - account.token, account.aes_key, account.corp_id + self.connection_type = (account.connection_type or "webhook").strip().lower() + self.crypto = ( + WeChatCrypto(account.token, account.aes_key, account.corp_id) + if self.connection_type == "webhook" + else None ) self._server: Optional[_SharedWebhookServer] = None self._access_token: Optional[str] = None self._access_token_expires_at: float = 0.0 self._access_token_lock = asyncio.Lock() + self._ws_task: Optional[asyncio.Task] = None + self._ws: Optional[aiohttp.ClientWebSocketResponse] = None + self._ws_send_lock: Optional[asyncio.Lock] = None + self._heartbeat_task: Optional[asyncio.Task] = None + self._stop_requested = False async def start(self) -> None: + self._stop_requested = False + if self.connection_type == "websocket": + if self._ws_task and not self._ws_task.done(): + return + self._ws_send_lock = asyncio.Lock() + self._ws_task = asyncio.create_task( + self._run_websocket(), + name=f"wecom-ws-{self.account_id}", + ) + LOGGER.info( + "[wecom:%s] starting websocket client (bot_id=%s)", + self.account_id, + self.account.bot_id, + ) + return + self._server = await _acquire_server( self.account.webhook_host, self.account.webhook_port ) @@ -164,6 +192,27 @@ class WeComChannel(Channel): ) async def stop(self) -> None: + self._stop_requested = True + if self._heartbeat_task and not self._heartbeat_task.done(): + self._heartbeat_task.cancel() + try: + await self._heartbeat_task + except BaseException: + pass + self._heartbeat_task = None + if self._ws is not None and not self._ws.closed: + try: + await self._ws.close() + except Exception: + pass + if self._ws_task and not self._ws_task.done(): + self._ws_task.cancel() + try: + await self._ws_task + except BaseException: + pass + self._ws_task = None + self._ws = None if self._server is not None: self._server.channels.pop(self.account_id, None) await _release_server( @@ -171,27 +220,235 @@ class WeComChannel(Channel): ) self._server = None - async def handle_decrypted_message(self, msg) -> None: + async def _handle_text_message( + self, + *, + chat_id: str, + sender_id: str, + message_id: str, + text: str, + raw: Any, + chat_type: str = "p2p", + ) -> None: + try: + if not (text or "").strip(): + return + incoming = IncomingMessage( + channel=self.channel_id, + account_id=self.account_id, + chat_id=chat_id, + chat_type=chat_type, + message_id=message_id, + sender_id=sender_id, + text=text, + raw=raw, + ) + await self._dispatch(incoming) + except Exception: + LOGGER.error( + "[wecom:%s] inbound message handling error", + self.account_id, + exc_info=True, + ) + + async def handle_decrypted_message(self, msg) -> None: + # Short-connection webhook mode. try: - # Only handle plain text events; ignore image/voice/event etc. if getattr(msg, "type", "") != "text": return user_id = str(getattr(msg, "source", "") or "") if not user_id: return - incoming = IncomingMessage( - channel=self.channel_id, - account_id=self.account_id, + await self._handle_text_message( chat_id=user_id, - chat_type="p2p", - message_id=str(getattr(msg, "id", "") or ""), sender_id=user_id, + message_id=str(getattr(msg, "id", "") or ""), text=getattr(msg, "content", "") or "", raw=msg, + chat_type="p2p", ) - await self._dispatch(incoming) except Exception: - LOGGER.error("[wecom:%s] inbound message handling error", self.account_id, exc_info=True) + LOGGER.error( + "[wecom:%s] inbound message handling error", + self.account_id, + exc_info=True, + ) + + async def _run_websocket(self) -> None: + while not self._stop_requested: + try: + async with aiohttp.ClientSession() as session: + async with session.ws_connect(WECOM_WS_URL, heartbeat=None) as ws: + self._ws = ws + LOGGER.info( + "[wecom:%s] websocket connected", + self.account_id, + ) + await self._subscribe_websocket(ws) + self._heartbeat_task = asyncio.create_task( + self._heartbeat_loop(ws) + ) + async for msg in ws: + if self._stop_requested: + break + if msg.type == aiohttp.WSMsgType.TEXT: + await self._handle_ws_payload(msg.data) + elif msg.type == aiohttp.WSMsgType.BINARY: + await self._handle_ws_payload( + msg.data.decode("utf-8", "ignore") + ) + elif msg.type == aiohttp.WSMsgType.PONG: + LOGGER.debug("[wecom:%s] websocket pong", self.account_id) + elif msg.type in ( + aiohttp.WSMsgType.CLOSE, + aiohttp.WSMsgType.CLOSED, + aiohttp.WSMsgType.ERROR, + ): + break + except PermissionError as ex: + self._stop_requested = True + LOGGER.error( + "[wecom:%s] websocket auth failed; stop reconnecting: %s", + self.account_id, + ex, + ) + break + except asyncio.CancelledError: + break + except Exception: + LOGGER.error( + "[wecom:%s] websocket loop error", + self.account_id, + exc_info=True, + ) + finally: + if self._heartbeat_task and not self._heartbeat_task.done(): + self._heartbeat_task.cancel() + try: + await self._heartbeat_task + except BaseException: + pass + self._heartbeat_task = None + self._ws = None + if not self._stop_requested: + await asyncio.sleep(3) + + async def _heartbeat_loop(self, ws: aiohttp.ClientWebSocketResponse) -> None: + try: + while not self._stop_requested and not ws.closed: + await asyncio.sleep(25) + await ws.ping() + LOGGER.debug("[wecom:%s] websocket ping", self.account_id) + except asyncio.CancelledError: + pass + except Exception: + LOGGER.error("[wecom:%s] websocket heartbeat failed", self.account_id, exc_info=True) + + async def _subscribe_websocket(self, ws: aiohttp.ClientWebSocketResponse) -> None: + if not self.account.bot_id: + raise RuntimeError(f"wecom account '{self.account_id}' missing bot_id") + payload = { + "cmd": "aibot_subscribe", + "headers": {"req_id": f"req-{time.time_ns()}"}, + "body": { + "bot_id": self.account.bot_id, + "secret": self.account.secret, + }, + } + await ws.send_json(payload) + LOGGER.info("[wecom:%s] websocket subscribe sent", self.account_id) + try: + resp = await asyncio.wait_for(ws.receive_json(), timeout=10) + except Exception as err: + raise RuntimeError("wecom websocket subscribe ack timeout") from err + if not isinstance(resp, dict): + raise RuntimeError(f"wecom websocket subscribe response invalid: {resp!r}") + if resp.get("cmd") != "aibot_subscribe": + LOGGER.warning( + "[wecom:%s] unexpected subscribe response: %s", + self.account_id, + resp, + ) + errcode = int(resp.get("errcode", 0) or 0) + if errcode != 0: + if errcode == 853000: + raise PermissionError( + f"wecom websocket subscribe failed: invalid bot_id or secret: {resp}" + ) + raise RuntimeError(f"wecom websocket subscribe failed: {resp}") + LOGGER.info("[wecom:%s] websocket subscribed", self.account_id) + + async def _handle_ws_payload(self, payload: str) -> None: + if not payload: + return + try: + obj = json.loads(payload) + except json.JSONDecodeError: + LOGGER.error( + "[wecom:%s] invalid websocket payload: %r", + self.account_id, + payload[:200], + ) + return + cmd = str(obj.get("cmd") or "") + headers = obj.get("headers") or {} + body = obj.get("body") or {} + if cmd == "aibot_msg_callback": + await self._handle_ws_message(headers, body, obj) + return + if cmd == "aibot_event_callback": + await self._handle_ws_event(headers, body, obj) + return + if cmd in ("aibot_subscribe", "aibot_respond_msg", "aibot_respond_welcome_msg"): + errcode = obj.get("errcode") + if errcode not in (None, 0, "0"): + LOGGER.error("[wecom:%s] websocket response error: %s", self.account_id, obj) + else: + LOGGER.debug("[wecom:%s] websocket response: %s", self.account_id, obj) + return + LOGGER.debug("[wecom:%s] websocket ignored cmd=%s", self.account_id, cmd) + + async def _handle_ws_message(self, headers: Any, body: Any, raw: Any) -> None: + if not isinstance(body, dict): + return + msgtype = str(body.get("msgtype") or "") + if msgtype != "text": + return + sender = body.get("from") or {} + sender_id = str(sender.get("userid") or "") + if not sender_id: + return + chat_type = str(body.get("chattype") or "") + chat_id = str(body.get("chatid") or sender_id or "") + req_id = str((headers or {}).get("req_id") or body.get("msgid") or "") + content = str((body.get("text") or {}).get("content") or "") + await self._handle_text_message( + chat_id=chat_id or sender_id, + sender_id=sender_id, + message_id=req_id, + text=content, + raw=raw, + chat_type="group" if chat_type == "group" else "p2p", + ) + + async def _handle_ws_event(self, headers: Any, body: Any, raw: Any) -> None: + if not isinstance(body, dict): + return + event = body.get("event") or {} + event_type = str(event.get("eventtype") or "") + req_id = str((headers or {}).get("req_id") or body.get("msgid") or "") + LOGGER.info( + "[wecom:%s] websocket event=%s req_id=%s", + self.account_id, + event_type or "unknown", + req_id, + ) + if event_type == "disconnected_event": + self._stop_requested = True + if self._ws is not None and not self._ws.closed: + await self._ws.close() + return + # Other events are accepted but do not trigger the RAG handler. async def _get_access_token(self) -> str: async with self._access_token_lock: @@ -217,6 +474,10 @@ class WeComChannel(Channel): return self._access_token async def send(self, message: OutgoingMessage) -> None: + if self.connection_type == "websocket": + await self._send_websocket_message(message) + return + if not message.chat_id: LOGGER.error("[wecom:%s] missing chat_id; cannot send", self.account_id) return @@ -252,35 +513,76 @@ class WeComChannel(Channel): self._access_token_expires_at = 0.0 LOGGER.error("[wecom:%s] send failed: %s", self.account_id, data) + async def _send_websocket_message(self, message: OutgoingMessage) -> None: + if not message.text: + LOGGER.error("[wecom:%s] empty websocket reply text", self.account_id) + return + if self._ws is None or self._ws.closed: + LOGGER.error("[wecom:%s] websocket is not connected", self.account_id) + return + payload = { + "cmd": "aibot_send_msg", + "headers": {"req_id": f"req-{time.time_ns()}"}, + "body": { + "chatid": message.chat_id, + "msgtype": "markdown", + "markdown": {"content": message.text}, + }, + } + try: + if self._ws_send_lock is None: + self._ws_send_lock = asyncio.Lock() + async with self._ws_send_lock: + await self._ws.send_json(payload) + LOGGER.info( + "[wecom:%s] websocket reply sent chat_id=%s", + self.account_id, + message.chat_id, + ) + except Exception: + LOGGER.error("[wecom:%s] websocket send failed", self.account_id, exc_info=True) + def _build(account_id: str, cfg: dict) -> Channel: - required = ("corp_id", "agent_id", "secret", "token", "aes_key") + connection_type = str(cfg.get("connection_type") or "webhook").strip().lower() + if connection_type == "websocket": + required = ("bot_id", "secret") + else: + required = ("corp_id", "agent_id", "secret", "token", "aes_key") missing = [k for k in required if not cfg.get(k)] if missing: raise ValueError( f"wecom account '{account_id}' missing required fields: {missing}" ) - try: - agent_id = int(cfg["agent_id"]) - except (TypeError, ValueError) as err: - raise ValueError( - f"wecom account '{account_id}' agent_id must be int: {err}" - ) from err - # WeCom EncodingAESKey is always 43 characters; reject placeholders early so - # the failure is a clear message instead of a base64 "Incorrect padding" error. - aes_key = str(cfg["aes_key"]) - if len(aes_key) != 43: - raise ValueError( - f"wecom account '{account_id}' aes_key (EncodingAESKey) must be 43 characters, got {len(aes_key)}" - ) + agent_id = 0 + aes_key = "" + corp_id = str(cfg.get("corp_id") or "") + token = str(cfg.get("token") or "") + bot_id = str(cfg.get("bot_id") or "") + if connection_type == "webhook": + try: + agent_id = int(cfg["agent_id"]) + except (TypeError, ValueError) as err: + raise ValueError( + f"wecom account '{account_id}' agent_id must be int: {err}" + ) from err + # WeCom EncodingAESKey is always 43 characters; reject placeholders early so + # the failure is a clear message instead of a base64 "Incorrect padding" error. + aes_key = str(cfg["aes_key"]) + if len(aes_key) != 43: + raise ValueError( + f"wecom account '{account_id}' aes_key (EncodingAESKey) must be 43 characters, got {len(aes_key)}" + ) return WeComChannel( WeComAccount( account_id=account_id, - corp_id=str(cfg["corp_id"]), + connection_type=connection_type, + corp_id=corp_id, agent_id=agent_id, secret=str(cfg["secret"]), - token=str(cfg["token"]), - aes_key=str(cfg["aes_key"]), + token=token, + aes_key=aes_key, + bot_id=bot_id, webhook_host=str(cfg.get("webhook_host", "0.0.0.0")), webhook_port=int(cfg.get("webhook_port", 3002)), ) diff --git a/web/src/assets/svg/chat-channel/wecom.svg b/web/src/assets/svg/chat-channel/wecom.svg index 32009faeb2..74df635a92 100644 --- a/web/src/assets/svg/chat-channel/wecom.svg +++ b/web/src/assets/svg/chat-channel/wecom.svg @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/web/src/assets/svg/chat-channel/wexin.svg b/web/src/assets/svg/chat-channel/wexin.svg new file mode 100644 index 0000000000..32009faeb2 --- /dev/null +++ b/web/src/assets/svg/chat-channel/wexin.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/web/src/pages/user-setting/chat-channel/constant/index.tsx b/web/src/pages/user-setting/chat-channel/constant/index.tsx index 8eea4fdf5c..64f43240d6 100644 --- a/web/src/pages/user-setting/chat-channel/constant/index.tsx +++ b/web/src/pages/user-setting/chat-channel/constant/index.tsx @@ -540,12 +540,41 @@ export const ChatChannelFormFields: Record = }, ], [ChatChannelKey.WECOM]: [ + { + label: 'Connection Type', + name: 'config.credential.connection_type', + type: FormFieldType.Select, + required: true, + defaultValue: 'webhook', + options: [ + { label: 'Webhook', value: 'webhook' }, + { label: 'WebSocket', value: 'websocket' }, + ], + }, + { + label: 'Bot ID', + name: 'config.credential.bot_id', + type: FormFieldType.Text, + required: true, + placeholder: 'AIBOTID', + shouldRender: (values: any) => + values?.config?.credential?.connection_type === 'websocket', + }, + { + label: 'Secret', + name: 'config.credential.secret', + type: FormFieldType.Password, + required: true, + placeholder: 'App Secret / Long-connection Secret', + }, { label: 'Corp ID', name: 'config.credential.corp_id', type: FormFieldType.Text, required: true, placeholder: 'ww1234567890abcdef', + shouldRender: (values: any) => + values?.config?.credential?.connection_type !== 'websocket', }, { label: 'Agent ID', @@ -553,18 +582,16 @@ export const ChatChannelFormFields: Record = type: FormFieldType.Number, required: true, placeholder: '1000001', - }, - { - label: 'Secret', - name: 'config.credential.secret', - type: FormFieldType.Password, - required: true, + shouldRender: (values: any) => + values?.config?.credential?.connection_type !== 'websocket', }, { label: 'Token', name: 'config.credential.token', type: FormFieldType.Password, required: true, + shouldRender: (values: any) => + values?.config?.credential?.connection_type !== 'websocket', }, { label: 'AES Key', @@ -572,6 +599,8 @@ export const ChatChannelFormFields: Record = type: FormFieldType.Password, required: true, placeholder: '43 chars', + shouldRender: (values: any) => + values?.config?.credential?.connection_type !== 'websocket', }, ], [ChatChannelKey.WHATSAPP]: [], @@ -646,6 +675,9 @@ export const ChatChannelFormDefaultValues: Record< // googlechat carries a non-credential discriminator (auth_mode). ChatChannelFormDefaultValues[ChatChannelKey.GOOGLECHAT].config.auth_mode = 'webhook_url'; +ChatChannelFormDefaultValues[ + ChatChannelKey.WECOM +].config.credential.connection_type = 'webhook'; export const getChatChannelFields = ( key?: ChatChannelKey, diff --git a/web/src/pages/user-setting/chat-channel/index.tsx b/web/src/pages/user-setting/chat-channel/index.tsx index f1e5751eb0..71b12ebc8f 100644 --- a/web/src/pages/user-setting/chat-channel/index.tsx +++ b/web/src/pages/user-setting/chat-channel/index.tsx @@ -64,6 +64,7 @@ const ChatChannel = () => { ChatChannelKey.FEISHU, ChatChannelKey.TELEGRAM, ChatChannelKey.QQBOT, + ChatChannelKey.WECOM, ].includes(id), // Show only selected chat channels ) .map((id) => ({