mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
### What problem does this PR solve? #15844 Adds a **Chat channels** capability so a RAGFlow assistant (Dialog) can be exposed as a bot on external messaging platforms (Feishu/Lark, Discord, Telegram, Slack, WeCom, LINE, etc.). An admin configures a bot in the UI, connects it to an assistant, and inbound messages are answered from that assistant's knowledge base — replies are delivered back on the channel. **Feishu/Lark is implemented and tested end-to-end.** Discord, Telegram, LINE, and WeCom are scaffolded against the same interface; the remaining listed channels are tracked as follow-ups. ### Design **Backend** - New `chat_channel` table (`tenant_id`, `name`, `channel`, `config` JSON holding `{credential: {...}}`, `dialog_id`, `status`) + `ChatChannelService` and RESTful CRUD under `/api/v1/chat_channels`. - Channel framework under `api/channels/`: a `core` registry + per-channel packages that self-register a builder and implement a common `Channel` interface (`start`/`stop`/`send` + inbound normalization) over `IncomingMessage`/`OutgoingMessage`. - Embedded **reconcile loop** in `ragflow_server` (`api/channels/bootstrap.py`): loads enabled bots, and starts/stops/restarts them as rows change (no server restart needed). Inbound messages run the connected dialog via the non-streaming completion path, keeping per-end-user conversation history. - Missing optional channel SDKs degrade gracefully (channel skipped with a warning; others unaffected). Channel-level errors are logged, not crashed. - Feishu's WebSocket client runs in a dedicated thread with its own event loop to avoid cross-loop/contextvars conflicts with the channel runtime. **Frontend** - **Settings → Chat channels** panel: available-channels grid + configured-bots list with add/edit/delete and a **Connect assistant** popup that binds a bot to a dialog. - Brand icons via simple-icons / reused shared data-source assets, with colored fallbacks for brands not available. - Route, sidebar entry, i18n (en/zh), and a top-nav segment-boundary fix so the settings page no longer highlights the Chat tab. ### Type of change - [x] New Feature (non-breaking change which adds functionality) ### Notes - DB: new `chat_channel` table is auto-created; `chat_channel.dialog_id` is also covered by a `migrate_db` `alter_db_add_column` for existing installs. - Channel SDKs (`lark-oapi`, `discord.py`, `python-telegram-bot`, `line-bot-sdk`, `wechatpy`, `aiohttp`) added to dependencies. - Screenshots / per-channel credential docs to follow. <img width="1338" height="1290" alt="Image" src="https://github.com/user-attachments/assets/042cb2f9-0dad-4e6a-bcf7-43ced4bbd704" /> <img width="1344" height="738" alt="Image" src="https://github.com/user-attachments/assets/373cd08e-ec40-4c67-9c51-4d948b1ba617" /> <img width="672" height="887" alt="Image" src="https://github.com/user-attachments/assets/5a34953f-a9a3-4c1e-869e-5eff0dc64c84" /> ---------
231 lines
8.0 KiB
Python
231 lines
8.0 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Optional, Tuple
|
|
|
|
from aiohttp import web
|
|
from linebot.v3 import WebhookParser
|
|
from linebot.v3.exceptions import InvalidSignatureError
|
|
from linebot.v3.messaging import (
|
|
AsyncApiClient,
|
|
AsyncMessagingApi,
|
|
Configuration,
|
|
PushMessageRequest,
|
|
ReplyMessageRequest,
|
|
TextMessage,
|
|
)
|
|
from linebot.v3.webhooks import (
|
|
GroupSource,
|
|
MessageEvent,
|
|
RoomSource,
|
|
TextMessageContent,
|
|
UserSource,
|
|
)
|
|
|
|
from ..core.base import Channel, IncomingMessage, OutgoingMessage
|
|
from ..core.registry import register_channel
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class LineAccount:
|
|
account_id: str
|
|
channel_secret: str
|
|
channel_access_token: str
|
|
webhook_host: str = "0.0.0.0"
|
|
webhook_port: int = 3001
|
|
|
|
|
|
class _SharedWebhookServer:
|
|
def __init__(self, host: str, port: int) -> None:
|
|
self.host = host
|
|
self.port = port
|
|
self.app = web.Application()
|
|
self.app.router.add_post("/line/{account_id}/webhook", self._handle_request)
|
|
self.runner: Optional[web.AppRunner] = None
|
|
self.site: Optional[web.TCPSite] = None
|
|
self.channels: Dict[str, "LineChannel"] = {}
|
|
|
|
async def start(self) -> None:
|
|
if self.runner is not None:
|
|
return
|
|
self.runner = web.AppRunner(self.app)
|
|
await self.runner.setup()
|
|
self.site = web.TCPSite(self.runner, self.host, self.port)
|
|
await self.site.start()
|
|
LOGGER.info(
|
|
"[line] webhook listening on http://%s:%s/line/<account_id>/webhook",
|
|
self.host,
|
|
self.port,
|
|
)
|
|
|
|
async def stop(self) -> None:
|
|
if self.site is not None:
|
|
await self.site.stop()
|
|
if self.runner is not None:
|
|
await self.runner.cleanup()
|
|
self.runner = None
|
|
self.site = None
|
|
|
|
async def _handle_request(self, request: web.Request) -> web.Response:
|
|
account_id = request.match_info.get("account_id", "")
|
|
try:
|
|
body = await request.text()
|
|
signature = request.headers.get("x-line-signature", "")
|
|
channel = self.channels.get(account_id)
|
|
if channel is None:
|
|
return web.Response(status=404, text="unknown account")
|
|
try:
|
|
events = channel.parser.parse(body, signature)
|
|
except InvalidSignatureError:
|
|
return web.Response(status=403, text="bad signature")
|
|
for event in events:
|
|
try:
|
|
await channel.handle_event(event)
|
|
except Exception:
|
|
LOGGER.error("[line:%s] event handling error", account_id, exc_info=True)
|
|
except Exception:
|
|
LOGGER.error("[line:%s] inbound request handling error", account_id, exc_info=True)
|
|
return web.Response(status=200, text="ok")
|
|
|
|
|
|
_servers: Dict[Tuple[str, int], _SharedWebhookServer] = {}
|
|
_active_per_server: Dict[Tuple[str, int], int] = {}
|
|
|
|
|
|
async def _acquire_server(host: str, port: int) -> _SharedWebhookServer:
|
|
key = (host, port)
|
|
server = _servers.get(key)
|
|
if server is None:
|
|
server = _SharedWebhookServer(host, port)
|
|
_servers[key] = server
|
|
await server.start()
|
|
_active_per_server[key] = _active_per_server.get(key, 0) + 1
|
|
return server
|
|
|
|
|
|
async def _release_server(host: str, port: int) -> None:
|
|
key = (host, port)
|
|
remaining = _active_per_server.get(key, 0) - 1
|
|
_active_per_server[key] = remaining
|
|
if remaining <= 0:
|
|
server = _servers.pop(key, None)
|
|
_active_per_server.pop(key, None)
|
|
if server is not None:
|
|
await server.stop()
|
|
|
|
|
|
def _chat_type_and_id(source) -> Tuple[str, str]:
|
|
if isinstance(source, GroupSource):
|
|
return ("group", source.group_id or "")
|
|
if isinstance(source, RoomSource):
|
|
return ("group", source.room_id or "")
|
|
if isinstance(source, UserSource):
|
|
return ("p2p", source.user_id or "")
|
|
return (type(source).__name__, getattr(source, "user_id", "") or "")
|
|
|
|
|
|
class LineChannel(Channel):
|
|
channel_id = "line"
|
|
|
|
def __init__(self, account: LineAccount) -> None:
|
|
super().__init__()
|
|
self.account = account
|
|
self.account_id = account.account_id
|
|
self.parser = WebhookParser(account.channel_secret)
|
|
self._config = Configuration(access_token=account.channel_access_token)
|
|
self._server: Optional[_SharedWebhookServer] = None
|
|
# LINE reply tokens are single-use and expire ~30s after the event.
|
|
self._reply_tokens: Dict[str, str] = {}
|
|
|
|
async def start(self) -> None:
|
|
self._server = await _acquire_server(self.account.webhook_host, self.account.webhook_port)
|
|
self._server.channels[self.account_id] = self
|
|
LOGGER.info(
|
|
"[line:%s] registered at path /line/%s/webhook",
|
|
self.account_id,
|
|
self.account_id,
|
|
)
|
|
|
|
async def stop(self) -> None:
|
|
if self._server is not None:
|
|
self._server.channels.pop(self.account_id, None)
|
|
await _release_server(self.account.webhook_host, self.account.webhook_port)
|
|
self._server = None
|
|
|
|
async def handle_event(self, event) -> None:
|
|
try:
|
|
if not isinstance(event, MessageEvent):
|
|
return
|
|
content = event.message
|
|
if not isinstance(content, TextMessageContent):
|
|
return
|
|
chat_type, chat_id = _chat_type_and_id(event.source)
|
|
sender_id = getattr(event.source, "user_id", "") or ""
|
|
if event.reply_token:
|
|
self._reply_tokens[content.id] = event.reply_token
|
|
incoming = IncomingMessage(
|
|
channel=self.channel_id,
|
|
account_id=self.account_id,
|
|
chat_id=chat_id,
|
|
chat_type=chat_type,
|
|
message_id=content.id,
|
|
sender_id=sender_id,
|
|
text=content.text or "",
|
|
raw=event,
|
|
)
|
|
await self._dispatch(incoming)
|
|
except Exception:
|
|
LOGGER.error("[line:%s] inbound message handling error", self.account_id, exc_info=True)
|
|
|
|
async def send(self, message: OutgoingMessage) -> None:
|
|
reply_token: Optional[str] = None
|
|
if message.reply_to_message_id:
|
|
reply_token = self._reply_tokens.pop(message.reply_to_message_id, None)
|
|
|
|
try:
|
|
async with AsyncApiClient(self._config) as api_client:
|
|
api = AsyncMessagingApi(api_client)
|
|
if reply_token:
|
|
await api.reply_message(
|
|
ReplyMessageRequest(
|
|
reply_token=reply_token,
|
|
messages=[TextMessage(text=message.text)],
|
|
)
|
|
)
|
|
else:
|
|
if not message.chat_id:
|
|
LOGGER.error("[line:%s] no chat_id for push send", self.account_id)
|
|
return
|
|
await api.push_message(
|
|
PushMessageRequest(
|
|
to=message.chat_id,
|
|
messages=[TextMessage(text=message.text)],
|
|
)
|
|
)
|
|
except Exception:
|
|
LOGGER.error("[line:%s] send failed", self.account_id, exc_info=True)
|
|
|
|
|
|
def _build(account_id: str, cfg: dict) -> Channel:
|
|
channel_secret = cfg.get("channel_secret")
|
|
channel_access_token = cfg.get("channel_access_token")
|
|
if not channel_secret or not channel_access_token:
|
|
raise ValueError(
|
|
f"line account '{account_id}' missing channel_secret or channel_access_token"
|
|
)
|
|
return LineChannel(
|
|
LineAccount(
|
|
account_id=account_id,
|
|
channel_secret=str(channel_secret),
|
|
channel_access_token=str(channel_access_token),
|
|
webhook_host=str(cfg.get("webhook_host", "0.0.0.0")),
|
|
webhook_port=int(cfg.get("webhook_port", 3001)),
|
|
)
|
|
)
|
|
|
|
|
|
register_channel("line", _build)
|