Files
ragflow/api/ragflow_server.py
Kevin Hu b5a426e6e0 Feat: chat channels — connect assistants to external messaging bots (#15850)
### What problem does this PR solve?

#15844

Adds a **Chat channels** capability so a RAGFlow assistant (Dialog) can
be exposed as a bot on external messaging platforms (Feishu/Lark,
Discord, Telegram, Slack, WeCom, LINE, etc.). An admin configures a bot
in the UI, connects it to an assistant, and inbound messages are
answered from that assistant's knowledge base — replies are delivered
back on the channel.

**Feishu/Lark is implemented and tested end-to-end.** Discord, Telegram,
LINE, and WeCom are scaffolded against the same interface; the remaining
listed channels are tracked as follow-ups.

### Design

**Backend**
- New `chat_channel` table (`tenant_id`, `name`, `channel`, `config`
JSON holding `{credential: {...}}`, `dialog_id`, `status`) +
`ChatChannelService` and RESTful CRUD under `/api/v1/chat_channels`.
- Channel framework under `api/channels/`: a `core` registry +
per-channel packages that self-register a builder and implement a common
`Channel` interface (`start`/`stop`/`send` + inbound normalization) over
`IncomingMessage`/`OutgoingMessage`.
- Embedded **reconcile loop** in `ragflow_server`
(`api/channels/bootstrap.py`): loads enabled bots, and
starts/stops/restarts them as rows change (no server restart needed).
Inbound messages run the connected dialog via the non-streaming
completion path, keeping per-end-user conversation history.
- Missing optional channel SDKs degrade gracefully (channel skipped with
a warning; others unaffected). Channel-level errors are logged, not
crashed.
- Feishu's WebSocket client runs in a dedicated thread with its own
event loop to avoid cross-loop/contextvars conflicts with the channel
runtime.

**Frontend**
- **Settings → Chat channels** panel: available-channels grid +
configured-bots list with add/edit/delete and a **Connect assistant**
popup that binds a bot to a dialog.
- Brand icons via simple-icons / reused shared data-source assets, with
colored fallbacks for brands not available.
- Route, sidebar entry, i18n (en/zh), and a top-nav segment-boundary fix
so the settings page no longer highlights the Chat tab.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

### Notes
- DB: new `chat_channel` table is auto-created; `chat_channel.dialog_id`
is also covered by a `migrate_db` `alter_db_add_column` for existing
installs.
- Channel SDKs (`lark-oapi`, `discord.py`, `python-telegram-bot`,
`line-bot-sdk`, `wechatpy`, `aiohttp`) added to dependencies.
- Screenshots / per-channel credential docs to follow.

<img width="1338" height="1290" alt="Image"
src="https://github.com/user-attachments/assets/042cb2f9-0dad-4e6a-bcf7-43ced4bbd704"
/>

<img width="1344" height="738" alt="Image"
src="https://github.com/user-attachments/assets/373cd08e-ec40-4c67-9c51-4d948b1ba617"
/>

<img width="672" height="887" alt="Image"
src="https://github.com/user-attachments/assets/5a34953f-a9a3-4c1e-869e-5eff0dc64c84"
/>

---------
2026-06-12 18:21:30 +08:00

175 lines
5.7 KiB
Python

#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
print("Start RAGFlow server...")
import time
start_ts = time.time()
import os
# LiteLLM fetches a model cost map from GitHub during import unless this is set.
# The API server should not block startup on external network access.
os.environ.setdefault("LITELLM_LOCAL_MODEL_COST_MAP", "True")
import logging
import signal
import sys
import threading
import uuid
import faulthandler
from api.apps import app
from api.db.runtime_config import RuntimeConfig
from api.db.services.document_service import DocumentService
from common.file_utils import get_project_base_directory
from common import settings
from api.db.db_models import init_database_tables as init_web_db
from api.db.init_data import init_web_data, init_superuser
from common.versions import get_ragflow_version
from common.config_utils import show_configs
from common.mcp_tool_call_conn import shutdown_all_mcp_sessions
from common.log_utils import init_root_logger
from agent.plugin import GlobalPluginManager
from rag.utils.redis_conn import RedisDistributedLock
stop_event = threading.Event()
RAGFLOW_DEBUGPY_LISTEN = int(os.environ.get('RAGFLOW_DEBUGPY_LISTEN', "0"))
def update_progress():
lock_value = str(uuid.uuid4())
redis_lock = RedisDistributedLock("update_progress", lock_value=lock_value, timeout=60)
logging.info(f"update_progress lock_value: {lock_value}")
while not stop_event.is_set():
try:
if redis_lock.acquire():
DocumentService.update_progress()
redis_lock.release()
except Exception:
logging.exception("update_progress exception")
finally:
try:
redis_lock.release()
except Exception:
logging.exception("update_progress exception")
stop_event.wait(6)
def signal_handler(sig, frame):
logging.info("Received interrupt signal, shutting down...")
shutdown_all_mcp_sessions()
stop_event.set()
stop_event.wait(1)
sys.exit(0)
if __name__ == '__main__':
faulthandler.enable()
init_root_logger("ragflow_server")
logging.info(r"""
____ ___ ______ ______ __
/ __ \ / | / ____// ____// /____ _ __
/ /_/ // /| | / / __ / /_ / // __ \| | /| / /
/ _, _// ___ |/ /_/ // __/ / // /_/ /| |/ |/ /
/_/ |_|/_/ |_|\____//_/ /_/ \____/ |__/|__/
""")
logging.info(
f'RAGFlow version: {get_ragflow_version()}'
)
logging.info(
f'project base: {get_project_base_directory()}'
)
show_configs()
settings.init_settings()
settings.print_rag_settings()
if RAGFLOW_DEBUGPY_LISTEN > 0:
logging.info(f"debugpy listen on {RAGFLOW_DEBUGPY_LISTEN}")
import debugpy
debugpy.listen(("0.0.0.0", RAGFLOW_DEBUGPY_LISTEN))
# init db
init_web_db()
init_web_data()
# init runtime config
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--version", default=False, help="RAGFlow version", action="store_true"
)
parser.add_argument(
"--debug", default=False, help="debug mode", action="store_true"
)
parser.add_argument(
"--init-superuser", default=False, help="init superuser", action="store_true"
)
args = parser.parse_args()
if args.version:
print(get_ragflow_version())
sys.exit(0)
if args.init_superuser:
init_superuser()
RuntimeConfig.DEBUG = args.debug
if RuntimeConfig.DEBUG:
logging.info("run on debug mode")
RuntimeConfig.init_env()
RuntimeConfig.init_config(JOB_SERVER_HOST=settings.HOST_IP, HTTP_PORT=settings.HOST_PORT)
GlobalPluginManager.load_plugins()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def delayed_start_update_progress():
logging.info("Starting update_progress thread (delayed)")
t = threading.Thread(target=update_progress, daemon=True)
t.start()
def start_chat_channels():
try:
from api.channels.bootstrap import start_channel_server
logging.info("Starting chat channel server thread")
t = threading.Thread(
target=start_channel_server,
args=(stop_event,),
daemon=True,
name="chat-channels",
)
t.start()
except Exception:
logging.exception("Failed to start chat channel server")
if RuntimeConfig.DEBUG:
if os.environ.get("WERKZEUG_RUN_MAIN") == "true":
threading.Timer(1.0, delayed_start_update_progress).start()
start_chat_channels()
else:
threading.Timer(1.0, delayed_start_update_progress).start()
start_chat_channels()
# start http server
try:
logging.info(f"RAGFlow server is ready after {time.time() - start_ts}s initialization.")
app.run(host=settings.HOST_IP, port=settings.HOST_PORT, use_reloader=RuntimeConfig.DEBUG, debug=False)
except Exception as e:
logging.exception(f"Unhandled exception: {e}")
stop_event.set()
stop_event.wait(1)
os.kill(os.getpid(), signal.SIGKILL)