mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
feat: add whatsapp web qr chat channel (#16238)
Adds a WhatsApp chat channel backed by a QR-based web login flow so users can connect without manual token setup.
This commit is contained in:
@@ -33,7 +33,7 @@ def _chat_channel_auth_error(channel_id: str, user_id: str):
|
||||
|
||||
@manager.route("/chat-channels", methods=["POST"]) # noqa: F821
|
||||
@login_required
|
||||
@validate_request("name", "channel", "config")
|
||||
@validate_request("name", "channel")
|
||||
async def create_chat_channel():
|
||||
"""Create a chat channel bot owned by the current tenant."""
|
||||
req = await get_request_json()
|
||||
@@ -42,7 +42,7 @@ async def create_chat_channel():
|
||||
"tenant_id": current_user.id,
|
||||
"name": req["name"],
|
||||
"channel": req["channel"],
|
||||
"config": req["config"],
|
||||
"config": req.get("config") or {},
|
||||
"chat_id": req.get("chat_id") or None
|
||||
}
|
||||
ChatChannelService.insert(**channel)
|
||||
@@ -115,3 +115,43 @@ def rm_chat_channel(channel_id):
|
||||
|
||||
ChatChannelService.delete_by_id(channel_id)
|
||||
return get_json_result(data=True)
|
||||
|
||||
|
||||
@manager.route("/chat-channels/<channel_id>/runtime", methods=["GET"]) # noqa: F821
|
||||
@login_required
|
||||
def get_chat_channel_runtime(channel_id):
|
||||
"""Return live runtime metadata for a running chat channel."""
|
||||
if not ChatChannelService.accessible(channel_id, current_user.id):
|
||||
return _chat_channel_auth_error(channel_id, current_user.id)
|
||||
|
||||
e, conn = ChatChannelService.get_by_id(channel_id)
|
||||
if not e:
|
||||
return get_data_error_result(message="Can't find this chat channel!")
|
||||
|
||||
if conn.channel != "whatsapp":
|
||||
return get_data_error_result(message="Runtime snapshot is only available for WhatsApp.")
|
||||
|
||||
try:
|
||||
from api.channels.whatsapp.channel import get_runtime_snapshot
|
||||
except Exception as ex:
|
||||
LOGGER.error("failed to load whatsapp runtime helper: %s", ex, exc_info=True)
|
||||
return get_data_error_result(message="WhatsApp runtime is unavailable.")
|
||||
|
||||
snapshot = get_runtime_snapshot(channel_id)
|
||||
if snapshot is None:
|
||||
return get_json_result(
|
||||
data={
|
||||
"account_id": channel_id,
|
||||
"session_key": channel_id,
|
||||
"status": "waiting",
|
||||
"connected_at": None,
|
||||
"qr_updated_at": None,
|
||||
"qr_data_url": None,
|
||||
"last_error": None,
|
||||
"session_id": None,
|
||||
"last_snapshot_at": None,
|
||||
"gateway_base_url": None,
|
||||
"event_cursor": 0,
|
||||
}
|
||||
)
|
||||
return get_json_result(data=snapshot)
|
||||
|
||||
@@ -33,7 +33,16 @@ import threading
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# Channel packages bundled under api/channels that self-register on import.
|
||||
_BUNDLED_CHANNELS = ("feishu", "discord", "telegram", "line", "wecom", "qqbot", "dingtalk")
|
||||
_BUNDLED_CHANNELS = (
|
||||
"feishu",
|
||||
"discord",
|
||||
"telegram",
|
||||
"line",
|
||||
"wecom",
|
||||
"qqbot",
|
||||
"dingtalk",
|
||||
"whatsapp",
|
||||
)
|
||||
|
||||
# How often (seconds) to reconcile running channels against the database.
|
||||
_RECONCILE_INTERVAL_SECS = 10
|
||||
@@ -144,7 +153,10 @@ def _make_chat_handler(ch):
|
||||
|
||||
answer_text = ""
|
||||
try:
|
||||
async for ans in async_chat(dia, history, False, quote=False):
|
||||
chat_kwargs = {"quote": False}
|
||||
if "{knowledge}" in (dia.prompt_config or {}).get("system", ""):
|
||||
chat_kwargs["knowledge"] = ""
|
||||
async for ans in async_chat(dia, history, False, **chat_kwargs):
|
||||
structure_answer(conv, ans, message_id, conv.id)
|
||||
answer_text = (ans or {}).get("answer", "") or ""
|
||||
ConversationService.update_by_id(conv.id, conv.to_dict())
|
||||
@@ -228,6 +240,18 @@ async def _reconcile(running: dict, failed: dict) -> None:
|
||||
if account_id not in desired or desired[account_id][2] != failed[account_id]:
|
||||
failed.pop(account_id, None)
|
||||
|
||||
active_whatsapp = any(channel == "whatsapp" for channel, _, _ in desired.values())
|
||||
if not active_whatsapp:
|
||||
active_whatsapp = any(
|
||||
entry["ch"].channel_id == "whatsapp" for entry in running.values()
|
||||
)
|
||||
from api.channels.whatsapp.gateway import sync_whatsapp_gateway
|
||||
|
||||
try:
|
||||
await sync_whatsapp_gateway(active_whatsapp)
|
||||
except Exception:
|
||||
LOGGER.exception("failed to sync WhatsApp gateway enabled=%s", active_whatsapp)
|
||||
|
||||
# Start channels that are new (skip ones already known to fail with this config).
|
||||
for account_id, (channel, credential, fp) in desired.items():
|
||||
if account_id in running or failed.get(account_id) == fp:
|
||||
|
||||
1
api/channels/whatsapp/__init__.py
Normal file
1
api/channels/whatsapp/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from . import channel # noqa: F401
|
||||
393
api/channels/whatsapp/channel.py
Normal file
393
api/channels/whatsapp/channel.py
Normal file
@@ -0,0 +1,393 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Optional
|
||||
|
||||
import aiohttp
|
||||
|
||||
from ..core.base import Channel, IncomingMessage, OutgoingMessage
|
||||
from ..core.registry import register_channel
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
WHATSAPP_DEFAULT_TIMEOUT_SECS = 30
|
||||
WHATSAPP_DEFAULT_SESSION_KEY = "default"
|
||||
WHATSAPP_MESSAGE_TTL_SECS = 3600
|
||||
WHATSAPP_GATEWAY_START_RETRY_SECS = 30
|
||||
WHATSAPP_GATEWAY_RECONNECT_SECS = 3
|
||||
|
||||
|
||||
@dataclass
|
||||
class WhatsAppAccount:
|
||||
account_id: str
|
||||
gateway_base_url: str
|
||||
gateway_token: str = ""
|
||||
session_key: str = ""
|
||||
timeout_secs: int = WHATSAPP_DEFAULT_TIMEOUT_SECS
|
||||
|
||||
|
||||
_live_channels: dict[str, "WhatsAppChannel"] = {}
|
||||
|
||||
|
||||
def _default_gateway_base_url() -> str:
|
||||
return "http://127.0.0.1:3005"
|
||||
|
||||
|
||||
def get_runtime_snapshot(account_id: str) -> dict[str, Any] | None:
|
||||
channel = _live_channels.get(account_id)
|
||||
if channel is None:
|
||||
return None
|
||||
return channel.get_status_snapshot()
|
||||
|
||||
|
||||
class WhatsAppChannel(Channel):
|
||||
channel_id = "whatsapp"
|
||||
|
||||
def __init__(self, account: WhatsAppAccount) -> None:
|
||||
super().__init__()
|
||||
self.account = account
|
||||
self.account_id = account.account_id
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._stop_event = asyncio.Event()
|
||||
self._lifecycle_lock = asyncio.Lock()
|
||||
self._http_session: Optional[aiohttp.ClientSession] = None
|
||||
self._status: str = "stopped"
|
||||
self._last_error: str = ""
|
||||
self._qr_data_url: str = ""
|
||||
self._qr_updated_at: float = 0.0
|
||||
self._connected_at: float = 0.0
|
||||
self._last_snapshot_at: float = 0.0
|
||||
self._session_id: str = ""
|
||||
self._event_cursor: int = 0
|
||||
self._seen_message_ids: dict[str, float] = {}
|
||||
|
||||
def _session_key(self) -> str:
|
||||
value = str(self.account.session_key or "").strip()
|
||||
return value or WHATSAPP_DEFAULT_SESSION_KEY
|
||||
|
||||
def _gateway_base_url(self) -> str:
|
||||
return str(self.account.gateway_base_url or "").strip().rstrip("/")
|
||||
|
||||
def _gateway_url(self, path: str) -> str:
|
||||
base_url = self._gateway_base_url()
|
||||
if not base_url:
|
||||
raise ValueError("WhatsApp gateway_base_url is required")
|
||||
return f"{base_url}/{path.lstrip('/')}"
|
||||
|
||||
def _events_ws_url(self) -> str:
|
||||
base_url = self._gateway_base_url()
|
||||
if not base_url:
|
||||
raise ValueError("WhatsApp gateway_base_url is required")
|
||||
if base_url.startswith("http://"):
|
||||
ws_base = f"ws://{base_url.removeprefix('http://')}"
|
||||
elif base_url.startswith("https://"):
|
||||
ws_base = f"wss://{base_url.removeprefix('https://')}"
|
||||
else:
|
||||
ws_base = base_url
|
||||
return (
|
||||
f"{ws_base}/whatsapp/{self._session_key()}/events/ws"
|
||||
f"?after={self._event_cursor}"
|
||||
)
|
||||
|
||||
def _gateway_headers(self) -> dict[str, str]:
|
||||
token = str(self.account.gateway_token or "").strip()
|
||||
if not token:
|
||||
return {}
|
||||
if token.lower().startswith("bearer "):
|
||||
return {"Authorization": token}
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
async def _ensure_http_session(self) -> aiohttp.ClientSession:
|
||||
if self._http_session is not None and not self._http_session.closed:
|
||||
return self._http_session
|
||||
timeout = aiohttp.ClientTimeout(total=max(int(self.account.timeout_secs), 1))
|
||||
self._http_session = aiohttp.ClientSession(timeout=timeout)
|
||||
return self._http_session
|
||||
|
||||
async def _request_json(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
payload: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
session = await self._ensure_http_session()
|
||||
url = self._gateway_url(path)
|
||||
async with session.request(
|
||||
method,
|
||||
url,
|
||||
json=payload,
|
||||
headers=self._gateway_headers(),
|
||||
) as resp:
|
||||
text = await resp.text()
|
||||
if resp.status >= 400:
|
||||
raise RuntimeError(f"status: {resp.status}, response: {text}")
|
||||
if not text.strip():
|
||||
return {}
|
||||
content_type = resp.headers.get("content-type", "")
|
||||
if "application/json" not in content_type.lower():
|
||||
raise RuntimeError(
|
||||
f"unexpected response content-type: {content_type or 'unknown'}, response: {text[:200]}"
|
||||
)
|
||||
try:
|
||||
return await resp.json()
|
||||
except Exception as ex:
|
||||
raise RuntimeError(f"invalid json response: {text[:200]}") from ex
|
||||
|
||||
async def _request_json_with_retry(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
payload: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
deadline = time.time() + WHATSAPP_GATEWAY_START_RETRY_SECS
|
||||
last_error: Exception | None = None
|
||||
while time.time() < deadline and not self._stop_event.is_set():
|
||||
try:
|
||||
return await self._request_json(method, path, payload)
|
||||
except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as ex:
|
||||
last_error = ex
|
||||
await asyncio.sleep(0.5)
|
||||
if last_error is not None:
|
||||
raise last_error
|
||||
return await self._request_json(method, path, payload)
|
||||
|
||||
async def start(self) -> None:
|
||||
async with self._lifecycle_lock:
|
||||
if self._task and not self._task.done():
|
||||
return
|
||||
self._stop_event.clear()
|
||||
_live_channels[self.account_id] = self
|
||||
self._task = asyncio.create_task(self._run(), name=f"whatsapp-{self.account_id}")
|
||||
LOGGER.info("[whatsapp:%s] starting gateway client", self.account_id)
|
||||
|
||||
async def stop(self) -> None:
|
||||
async with self._lifecycle_lock:
|
||||
self._stop_event.set()
|
||||
task = self._task
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
try:
|
||||
await self._request_json(
|
||||
"POST",
|
||||
f"whatsapp/{self._session_key()}/stop",
|
||||
{"account_id": self.account_id, "session_key": self._session_key()},
|
||||
)
|
||||
except Exception:
|
||||
LOGGER.debug("[whatsapp:%s] gateway stop failed", self.account_id, exc_info=True)
|
||||
|
||||
try:
|
||||
if self._http_session is not None and not self._http_session.closed:
|
||||
await self._http_session.close()
|
||||
finally:
|
||||
self._http_session = None
|
||||
|
||||
_live_channels.pop(self.account_id, None)
|
||||
self._task = None
|
||||
self._status = "stopped"
|
||||
self._last_error = ""
|
||||
self._qr_data_url = ""
|
||||
self._qr_updated_at = 0.0
|
||||
self._connected_at = 0.0
|
||||
self._last_snapshot_at = 0.0
|
||||
self._session_id = ""
|
||||
self._event_cursor = 0
|
||||
self._seen_message_ids.clear()
|
||||
|
||||
async def send(self, message: OutgoingMessage) -> None:
|
||||
if not message.chat_id:
|
||||
LOGGER.error("[whatsapp:%s] missing chat_id; cannot send", self.account_id)
|
||||
return
|
||||
try:
|
||||
LOGGER.info(
|
||||
"[whatsapp:%s] sending reply chat_id=%s reply_to=%s text_preview=%r",
|
||||
self.account_id,
|
||||
message.chat_id,
|
||||
message.reply_to_message_id,
|
||||
message.text[:120],
|
||||
)
|
||||
await self._request_json(
|
||||
"POST",
|
||||
f"whatsapp/{self._session_key()}/send",
|
||||
{
|
||||
"account_id": self.account_id,
|
||||
"session_key": self._session_key(),
|
||||
"chat_id": message.chat_id,
|
||||
"text": message.text,
|
||||
"reply_to_message_id": message.reply_to_message_id,
|
||||
},
|
||||
)
|
||||
LOGGER.info(
|
||||
"[whatsapp:%s] message sent chat_id=%s reply_to=%s",
|
||||
self.account_id,
|
||||
message.chat_id,
|
||||
message.reply_to_message_id,
|
||||
)
|
||||
except Exception:
|
||||
LOGGER.error("[whatsapp:%s] send failed", self.account_id, exc_info=True)
|
||||
|
||||
async def _run(self) -> None:
|
||||
self._status = "connecting"
|
||||
self._last_error = ""
|
||||
try:
|
||||
await self._request_json_with_retry(
|
||||
"POST",
|
||||
f"whatsapp/{self._session_key()}/start",
|
||||
{
|
||||
"account_id": self.account_id,
|
||||
"session_key": self._session_key(),
|
||||
"gateway_base_url": self._gateway_base_url(),
|
||||
},
|
||||
)
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
await self._run_events_ws()
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as ex:
|
||||
self._status = "error"
|
||||
self._last_error = str(ex)
|
||||
LOGGER.error("[whatsapp:%s] gateway event loop error", self.account_id, exc_info=True)
|
||||
if not self._stop_event.is_set():
|
||||
await asyncio.sleep(WHATSAPP_GATEWAY_RECONNECT_SECS)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as ex:
|
||||
self._status = "error"
|
||||
self._last_error = str(ex)
|
||||
LOGGER.error("[whatsapp:%s] gateway runtime error", self.account_id, exc_info=True)
|
||||
finally:
|
||||
if not self._stop_event.is_set() and self._status != "error":
|
||||
self._status = "disconnected"
|
||||
if not self._last_error:
|
||||
self._last_error = "WhatsApp gateway stopped unexpectedly."
|
||||
self._last_snapshot_at = time.time()
|
||||
|
||||
def _apply_snapshot(self, snapshot: dict[str, Any]) -> None:
|
||||
self._last_snapshot_at = time.time()
|
||||
self._status = str(snapshot.get("status") or "connecting")
|
||||
self._last_error = str(snapshot.get("last_error") or "")
|
||||
self._qr_data_url = str(snapshot.get("qr_data_url") or "")
|
||||
self._qr_updated_at = float(snapshot.get("qr_updated_at") or 0.0)
|
||||
self._connected_at = float(snapshot.get("connected_at") or 0.0)
|
||||
self._session_id = str(snapshot.get("session_id") or self._session_id or "")
|
||||
self._event_cursor = max(self._event_cursor, int(snapshot.get("event_cursor") or 0))
|
||||
if self._status == "connected":
|
||||
self._last_error = ""
|
||||
|
||||
async def _run_events_ws(self) -> None:
|
||||
session = await self._ensure_http_session()
|
||||
url = self._events_ws_url()
|
||||
async with session.ws_connect(url, headers=self._gateway_headers(), heartbeat=None) as ws:
|
||||
async for msg in ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
await self._handle_ws_payload(msg.data)
|
||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||
raise RuntimeError(f"whatsapp events websocket error: {ws.exception()}")
|
||||
LOGGER.warning("[whatsapp:%s] events websocket closed", self.account_id)
|
||||
|
||||
async def _handle_ws_payload(self, payload: str) -> None:
|
||||
try:
|
||||
obj = json.loads(payload)
|
||||
except json.JSONDecodeError:
|
||||
LOGGER.warning(
|
||||
"[whatsapp:%s] ignored invalid websocket payload: %r",
|
||||
self.account_id,
|
||||
payload[:200],
|
||||
)
|
||||
return
|
||||
|
||||
kind = str(obj.get("type") or "").strip()
|
||||
data = obj.get("data")
|
||||
if kind == "snapshot" and isinstance(data, dict):
|
||||
self._apply_snapshot(data)
|
||||
return
|
||||
if kind == "event" and isinstance(data, dict):
|
||||
await self._handle_event_item(data)
|
||||
return
|
||||
|
||||
async def _handle_event_item(self, item: dict[str, Any]) -> None:
|
||||
seq = int(item.get("seq") or 0)
|
||||
if seq > self._event_cursor:
|
||||
self._event_cursor = seq
|
||||
if item.get("kind") != "message":
|
||||
return
|
||||
message_id = str(item.get("message_id") or "").strip()
|
||||
if not message_id:
|
||||
return
|
||||
self._prune_seen_message_ids()
|
||||
if self._is_seen_message(message_id):
|
||||
return
|
||||
self._seen_message_ids[message_id] = time.time()
|
||||
incoming = IncomingMessage(
|
||||
channel=self.channel_id,
|
||||
account_id=self.account_id,
|
||||
chat_id=str(item.get("chat_id") or ""),
|
||||
chat_type=str(item.get("chat_type") or "p2p"),
|
||||
message_id=message_id,
|
||||
sender_id=str(item.get("sender_id") or ""),
|
||||
text=str(item.get("text") or ""),
|
||||
raw=item.get("raw"),
|
||||
)
|
||||
LOGGER.info(
|
||||
"[whatsapp:%s] inbound message_id=%s chat_id=%s",
|
||||
self.account_id,
|
||||
message_id,
|
||||
incoming.chat_id,
|
||||
)
|
||||
await self._dispatch(incoming)
|
||||
|
||||
def _is_seen_message(self, message_id: str) -> bool:
|
||||
return message_id in self._seen_message_ids
|
||||
|
||||
def _prune_seen_message_ids(self) -> None:
|
||||
cutoff = time.time() - WHATSAPP_MESSAGE_TTL_SECS
|
||||
stale = [key for key, ts in self._seen_message_ids.items() if ts < cutoff]
|
||||
for key in stale:
|
||||
self._seen_message_ids.pop(key, None)
|
||||
|
||||
def get_status_snapshot(self) -> dict[str, Any]:
|
||||
return {
|
||||
"account_id": self.account_id,
|
||||
"session_key": self._session_key(),
|
||||
"status": self._status,
|
||||
"connected_at": self._connected_at or None,
|
||||
"qr_updated_at": self._qr_updated_at or None,
|
||||
"qr_data_url": self._qr_data_url or None,
|
||||
"last_error": self._last_error or None,
|
||||
"session_id": self._session_id or None,
|
||||
"last_snapshot_at": self._last_snapshot_at or None,
|
||||
"gateway_base_url": self._gateway_base_url() or None,
|
||||
"event_cursor": self._event_cursor,
|
||||
}
|
||||
|
||||
|
||||
def _build(account_id: str, cfg: dict) -> Channel:
|
||||
gateway_base_url = str(
|
||||
cfg.get("gateway_base_url")
|
||||
or cfg.get("gateway_url")
|
||||
or cfg.get("control_url")
|
||||
or _default_gateway_base_url()
|
||||
)
|
||||
gateway_token = str(cfg.get("gateway_token") or cfg.get("token") or "")
|
||||
session_key = str(cfg.get("session_key") or cfg.get("session_id") or account_id)
|
||||
timeout_secs = int(cfg.get("timeout_secs") or WHATSAPP_DEFAULT_TIMEOUT_SECS)
|
||||
return WhatsAppChannel(
|
||||
WhatsAppAccount(
|
||||
account_id=account_id,
|
||||
gateway_base_url=gateway_base_url,
|
||||
gateway_token=gateway_token,
|
||||
session_key=session_key,
|
||||
timeout_secs=timeout_secs,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
register_channel("whatsapp", _build)
|
||||
34
api/channels/whatsapp/gateway-node/README.md
Normal file
34
api/channels/whatsapp/gateway-node/README.md
Normal file
@@ -0,0 +1,34 @@
|
||||
# RAGFlow WhatsApp Gateway
|
||||
|
||||
This directory contains a minimal WhatsApp gateway built on top of
|
||||
`@whiskeysockets/baileys`.
|
||||
|
||||
## Install
|
||||
|
||||
```bash
|
||||
cd api/channels/whatsapp/gateway-node
|
||||
npm install
|
||||
```
|
||||
|
||||
## Run
|
||||
|
||||
```bash
|
||||
WHATSAPP_GATEWAY_PORT=3005 \
|
||||
WHATSAPP_GATEWAY_DATA_DIR=~/.ragflow/whatsapp-gateway \
|
||||
npm start
|
||||
```
|
||||
|
||||
## API
|
||||
|
||||
- `POST /whatsapp/:sessionKey/start`
|
||||
- `GET /whatsapp/:sessionKey/status`
|
||||
- `GET /whatsapp/:sessionKey/events/ws?after=<seq>` (WebSocket)
|
||||
- `POST /whatsapp/:sessionKey/send`
|
||||
- `POST /whatsapp/:sessionKey/stop`
|
||||
|
||||
## Notes
|
||||
|
||||
- Authentication state is persisted under `WHATSAPP_GATEWAY_DATA_DIR`.
|
||||
- Scan the QR code exposed in `status.qr_data_url`.
|
||||
- RAGFlow polls `status` and `events` and forwards inbound messages to the
|
||||
connected assistant.
|
||||
533
api/channels/whatsapp/gateway-node/index.js
Normal file
533
api/channels/whatsapp/gateway-node/index.js
Normal file
@@ -0,0 +1,533 @@
|
||||
import http from 'node:http';
|
||||
import crypto from 'node:crypto';
|
||||
import os from 'node:os';
|
||||
import path from 'node:path';
|
||||
import { mkdir } from 'node:fs/promises';
|
||||
|
||||
import makeWASocket, {
|
||||
Browsers,
|
||||
DisconnectReason,
|
||||
fetchLatestBaileysVersion,
|
||||
useMultiFileAuthState,
|
||||
} from 'baileys';
|
||||
import QRCode from 'qrcode';
|
||||
|
||||
const PORT = Number.parseInt(process.env.WHATSAPP_GATEWAY_PORT || '3005', 10);
|
||||
const HOST = process.env.WHATSAPP_GATEWAY_HOST || '127.0.0.1';
|
||||
const AUTH_TOKEN = String(process.env.WHATSAPP_GATEWAY_TOKEN || '').trim();
|
||||
const WS_MAGIC = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
|
||||
const DATA_DIR =
|
||||
process.env.WHATSAPP_GATEWAY_DATA_DIR ||
|
||||
path.join(os.homedir(), '.ragflow', 'whatsapp-gateway');
|
||||
|
||||
function now() {
|
||||
return Date.now() / 1000;
|
||||
}
|
||||
|
||||
function normalizeJid(chatId) {
|
||||
const raw = String(chatId || '').trim();
|
||||
if (!raw) {
|
||||
return '';
|
||||
}
|
||||
if (raw.includes('@')) {
|
||||
return raw;
|
||||
}
|
||||
const digits = raw.replace(/\D/g, '');
|
||||
if (!digits) {
|
||||
return '';
|
||||
}
|
||||
return `${digits}@s.whatsapp.net`;
|
||||
}
|
||||
|
||||
function detectChatType(jid) {
|
||||
if (jid.endsWith('@g.us')) {
|
||||
return 'group';
|
||||
}
|
||||
if (jid.endsWith('@newsletter')) {
|
||||
return 'channel';
|
||||
}
|
||||
return 'dm';
|
||||
}
|
||||
|
||||
function extractText(message) {
|
||||
if (!message) {
|
||||
return '';
|
||||
}
|
||||
return (
|
||||
message.conversation ||
|
||||
message.extendedTextMessage?.text ||
|
||||
message.imageMessage?.caption ||
|
||||
message.videoMessage?.caption ||
|
||||
message.documentMessage?.caption ||
|
||||
message.buttonsResponseMessage?.selectedButtonId ||
|
||||
message.listResponseMessage?.title ||
|
||||
''
|
||||
).trim();
|
||||
}
|
||||
|
||||
function safeMessageKey(message) {
|
||||
return {
|
||||
remoteJid: message.key?.remoteJid || '',
|
||||
fromMe: Boolean(message.key?.fromMe),
|
||||
id: message.key?.id || '',
|
||||
participant: message.key?.participant || '',
|
||||
};
|
||||
}
|
||||
|
||||
function buildWsFrame(text) {
|
||||
const data = Buffer.from(String(text), 'utf8');
|
||||
let header;
|
||||
if (data.length < 126) {
|
||||
header = Buffer.alloc(2);
|
||||
header[1] = data.length;
|
||||
} else if (data.length < 65536) {
|
||||
header = Buffer.alloc(4);
|
||||
header[1] = 126;
|
||||
header.writeUInt16BE(data.length, 2);
|
||||
} else {
|
||||
header = Buffer.alloc(10);
|
||||
header[1] = 127;
|
||||
header.writeBigUInt64BE(BigInt(data.length), 2);
|
||||
}
|
||||
header[0] = 0x81;
|
||||
return Buffer.concat([header, data]);
|
||||
}
|
||||
|
||||
function sendWsText(socket, payload) {
|
||||
socket.write(buildWsFrame(JSON.stringify(payload)));
|
||||
}
|
||||
|
||||
function isAuthorized(req) {
|
||||
if (!AUTH_TOKEN) {
|
||||
return true;
|
||||
}
|
||||
const auth = String(req.headers.authorization || '').trim();
|
||||
return auth === `Bearer ${AUTH_TOKEN}`;
|
||||
}
|
||||
|
||||
class WhatsAppSession {
|
||||
constructor(sessionKey) {
|
||||
this.sessionKey = sessionKey;
|
||||
this.sessionDir = path.join(DATA_DIR, sessionKey);
|
||||
this.status = 'stopped';
|
||||
this.lastError = '';
|
||||
this.qrDataUrl = '';
|
||||
this.qrUpdatedAt = 0;
|
||||
this.connectedAt = 0;
|
||||
this.sessionId = '';
|
||||
this.authRegistered = false;
|
||||
this.lastSnapshotAt = 0;
|
||||
this.eventSeq = 0;
|
||||
this.events = [];
|
||||
this.messageStore = new Map();
|
||||
this.sock = null;
|
||||
this.saveCreds = null;
|
||||
this.starting = null;
|
||||
this.stopping = false;
|
||||
this.restartTimer = null;
|
||||
this.subscribers = new Set();
|
||||
}
|
||||
|
||||
addSubscriber(socket, afterSeq) {
|
||||
const subscriber = { socket };
|
||||
this.subscribers.add(subscriber);
|
||||
socket.on('close', () => {
|
||||
this.subscribers.delete(subscriber);
|
||||
});
|
||||
socket.on('error', () => {
|
||||
this.subscribers.delete(subscriber);
|
||||
});
|
||||
|
||||
sendWsText(socket, { type: 'snapshot', data: this.snapshot() });
|
||||
const backlog = this.listEvents(afterSeq);
|
||||
for (const event of backlog.items) {
|
||||
sendWsText(socket, { type: 'event', data: event });
|
||||
}
|
||||
}
|
||||
|
||||
broadcast(message) {
|
||||
for (const subscriber of this.subscribers) {
|
||||
try {
|
||||
sendWsText(subscriber.socket, message);
|
||||
} catch {
|
||||
this.subscribers.delete(subscriber);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
snapshot() {
|
||||
return {
|
||||
session_key: this.sessionKey,
|
||||
status: this.status,
|
||||
last_error: this.lastError || null,
|
||||
qr_data_url: this.qrDataUrl || null,
|
||||
qr_updated_at: this.qrUpdatedAt || null,
|
||||
connected_at: this.connectedAt || null,
|
||||
session_id: this.sessionId || null,
|
||||
auth_registered: this.authRegistered,
|
||||
last_snapshot_at: this.lastSnapshotAt || null,
|
||||
event_cursor: this.eventSeq,
|
||||
event_queue_size: this.events.length,
|
||||
};
|
||||
}
|
||||
|
||||
listEvents(afterSeq) {
|
||||
const after = Number.isFinite(afterSeq) ? afterSeq : 0;
|
||||
return {
|
||||
next_cursor: this.eventSeq,
|
||||
items: this.events.filter((event) => event.seq > after),
|
||||
};
|
||||
}
|
||||
|
||||
async start() {
|
||||
if (this.starting) {
|
||||
return this.starting;
|
||||
}
|
||||
if (this.sock) {
|
||||
return;
|
||||
}
|
||||
this.stopping = false;
|
||||
this.starting = this._start().finally(() => {
|
||||
this.starting = null;
|
||||
});
|
||||
return this.starting;
|
||||
}
|
||||
|
||||
async _start() {
|
||||
await mkdir(this.sessionDir, { recursive: true });
|
||||
this.status = 'connecting';
|
||||
this.lastError = '';
|
||||
const { state, saveCreds } = await useMultiFileAuthState(this.sessionDir);
|
||||
this.saveCreds = saveCreds;
|
||||
this.authRegistered = Boolean(state?.creds?.registered);
|
||||
if (!this.authRegistered) {
|
||||
this.status = 'qr';
|
||||
}
|
||||
const { version } = await fetchLatestBaileysVersion();
|
||||
const sock = makeWASocket({
|
||||
auth: state,
|
||||
version,
|
||||
browser: Browsers.ubuntu('RAGFlow'),
|
||||
printQRInTerminal: false,
|
||||
markOnlineOnConnect: false,
|
||||
syncFullHistory: false,
|
||||
getMessage: async (key) => {
|
||||
if (!key?.id) {
|
||||
return undefined;
|
||||
}
|
||||
return this.messageStore.get(key.id);
|
||||
},
|
||||
});
|
||||
this.sock = sock;
|
||||
|
||||
sock.ev.on('creds.update', this.saveCreds);
|
||||
sock.ev.on('connection.update', (update) => {
|
||||
void this._handleConnectionUpdate(update);
|
||||
});
|
||||
sock.ev.on('messages.upsert', (update) => {
|
||||
void this._handleMessagesUpsert(update);
|
||||
});
|
||||
|
||||
this.lastSnapshotAt = now();
|
||||
}
|
||||
|
||||
async _handleConnectionUpdate(update) {
|
||||
if (update.qr) {
|
||||
this.status = 'qr';
|
||||
this.lastError = '';
|
||||
this.qrUpdatedAt = now();
|
||||
this.qrDataUrl = await QRCode.toDataURL(update.qr, {
|
||||
errorCorrectionLevel: 'M',
|
||||
margin: 2,
|
||||
scale: 8,
|
||||
});
|
||||
this.lastSnapshotAt = now();
|
||||
this.broadcast({ type: 'snapshot', data: this.snapshot() });
|
||||
}
|
||||
|
||||
if (update.connection === 'open') {
|
||||
this.status = 'connected';
|
||||
this.lastError = '';
|
||||
this.connectedAt = now();
|
||||
this.qrDataUrl = '';
|
||||
this.sessionId = this.sock?.user?.id || this.sessionId;
|
||||
this.authRegistered = true;
|
||||
this.lastSnapshotAt = now();
|
||||
this.broadcast({ type: 'snapshot', data: this.snapshot() });
|
||||
return;
|
||||
}
|
||||
|
||||
if (update.connection === 'close') {
|
||||
const reason = update.lastDisconnect?.error?.output?.statusCode;
|
||||
const loggedOut = reason === DisconnectReason.loggedOut;
|
||||
this.status = loggedOut ? 'error' : 'disconnected';
|
||||
this.lastError =
|
||||
update.lastDisconnect?.error?.message ||
|
||||
(loggedOut ? 'WhatsApp session logged out.' : 'WhatsApp session disconnected.');
|
||||
this.lastSnapshotAt = now();
|
||||
this.sock = null;
|
||||
this.saveCreds = null;
|
||||
this.authRegistered = false;
|
||||
this.broadcast({ type: 'snapshot', data: this.snapshot() });
|
||||
|
||||
if (!this.stopping && !loggedOut) {
|
||||
clearTimeout(this.restartTimer);
|
||||
this.restartTimer = setTimeout(() => {
|
||||
void this.start();
|
||||
}, 3000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async _handleMessagesUpsert(update) {
|
||||
for (const message of update.messages || []) {
|
||||
const key = safeMessageKey(message);
|
||||
if (key.fromMe || !key.id || !key.remoteJid) {
|
||||
continue;
|
||||
}
|
||||
const text = extractText(message.message);
|
||||
if (!text) {
|
||||
continue;
|
||||
}
|
||||
const jid = key.remoteJid;
|
||||
const event = {
|
||||
seq: ++this.eventSeq,
|
||||
kind: 'message',
|
||||
message_id: key.id,
|
||||
chat_id: jid,
|
||||
chat_type: detectChatType(jid),
|
||||
sender_id: key.participant || jid,
|
||||
text,
|
||||
raw: {
|
||||
key,
|
||||
message: message.message,
|
||||
pushName: message.pushName || '',
|
||||
messageTimestamp: message.messageTimestamp || 0,
|
||||
},
|
||||
};
|
||||
this.events.push(event);
|
||||
this.messageStore.set(key.id, message);
|
||||
this.broadcast({ type: 'event', data: event });
|
||||
if (this.events.length > 1000) {
|
||||
const dropped = this.events.slice(0, this.events.length - 500);
|
||||
this.events = this.events.slice(-500);
|
||||
for (const oldEvent of dropped) {
|
||||
if (oldEvent.kind === 'message' && oldEvent.message_id) {
|
||||
this.messageStore.delete(oldEvent.message_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.lastSnapshotAt = now();
|
||||
}
|
||||
}
|
||||
|
||||
async send(payload) {
|
||||
if (!this.sock) {
|
||||
throw new Error('WhatsApp session is not running.');
|
||||
}
|
||||
const jid = normalizeJid(payload.chat_id);
|
||||
if (!jid) {
|
||||
throw new Error(`Invalid chat_id: ${payload.chat_id}`);
|
||||
}
|
||||
const text = String(payload.text || '');
|
||||
const options = {};
|
||||
if (payload.reply_to_message_id) {
|
||||
const quoted = this.messageStore.get(String(payload.reply_to_message_id));
|
||||
if (quoted) {
|
||||
options.quoted = quoted;
|
||||
}
|
||||
}
|
||||
await this.sock.sendMessage(jid, { text }, options);
|
||||
this.lastSnapshotAt = now();
|
||||
}
|
||||
|
||||
async stop() {
|
||||
this.stopping = true;
|
||||
clearTimeout(this.restartTimer);
|
||||
this.restartTimer = null;
|
||||
const sock = this.sock;
|
||||
this.sock = null;
|
||||
this.saveCreds = null;
|
||||
if (sock) {
|
||||
try {
|
||||
sock.end?.(undefined);
|
||||
} catch {
|
||||
try {
|
||||
sock.ws?.close?.();
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
this.status = 'stopped';
|
||||
this.lastError = '';
|
||||
this.qrDataUrl = '';
|
||||
this.qrUpdatedAt = 0;
|
||||
this.connectedAt = 0;
|
||||
this.sessionId = '';
|
||||
this.lastSnapshotAt = now();
|
||||
this.subscribers.clear();
|
||||
this.events = [];
|
||||
this.messageStore.clear();
|
||||
}
|
||||
}
|
||||
|
||||
const sessions = new Map();
|
||||
|
||||
function getSession(sessionKey) {
|
||||
const key = String(sessionKey || '').trim() || 'default';
|
||||
let session = sessions.get(key);
|
||||
if (!session) {
|
||||
session = new WhatsAppSession(key);
|
||||
sessions.set(key, session);
|
||||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
function getExistingSession(sessionKey) {
|
||||
const key = String(sessionKey || '').trim() || 'default';
|
||||
return sessions.get(key) || null;
|
||||
}
|
||||
|
||||
async function readBody(req) {
|
||||
const chunks = [];
|
||||
for await (const chunk of req) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
if (!chunks.length) {
|
||||
return {};
|
||||
}
|
||||
const raw = Buffer.concat(chunks).toString('utf8');
|
||||
if (!raw.trim()) {
|
||||
return {};
|
||||
}
|
||||
return JSON.parse(raw);
|
||||
}
|
||||
|
||||
function sendJson(res, statusCode, payload) {
|
||||
const body = JSON.stringify(payload);
|
||||
res.writeHead(statusCode, {
|
||||
'content-type': 'application/json; charset=utf-8',
|
||||
'content-length': Buffer.byteLength(body),
|
||||
});
|
||||
res.end(body);
|
||||
}
|
||||
|
||||
function sendError(res, statusCode, message) {
|
||||
sendJson(res, statusCode, { code: statusCode, message, data: null });
|
||||
}
|
||||
|
||||
const server = http.createServer(async (req, res) => {
|
||||
try {
|
||||
const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
|
||||
const parts = url.pathname.split('/').filter(Boolean);
|
||||
|
||||
if (req.method === 'GET' && url.pathname === '/health') {
|
||||
return sendJson(res, 200, { ok: true });
|
||||
}
|
||||
|
||||
if (parts[0] !== 'whatsapp' || parts.length < 2) {
|
||||
return sendError(res, 404, 'not found');
|
||||
}
|
||||
|
||||
if (!isAuthorized(req)) {
|
||||
return sendError(res, 401, 'unauthorized');
|
||||
}
|
||||
|
||||
const sessionKey = decodeURIComponent(parts[1]);
|
||||
|
||||
if (req.method === 'POST' && parts.length === 3 && parts[2] === 'start') {
|
||||
const session = getSession(sessionKey);
|
||||
await session.start();
|
||||
return sendJson(res, 200, { code: 0, message: '', data: session.snapshot() });
|
||||
}
|
||||
|
||||
if (req.method === 'GET' && parts.length === 3 && parts[2] === 'status') {
|
||||
const session = getExistingSession(sessionKey);
|
||||
if (!session) {
|
||||
return sendError(res, 404, 'session not found');
|
||||
}
|
||||
return sendJson(res, 200, { code: 0, message: '', data: session.snapshot() });
|
||||
}
|
||||
|
||||
if (req.method === 'POST' && parts.length === 3 && parts[2] === 'send') {
|
||||
const session = getExistingSession(sessionKey);
|
||||
if (!session) {
|
||||
return sendError(res, 404, 'session not found');
|
||||
}
|
||||
const body = await readBody(req);
|
||||
await session.send(body);
|
||||
return sendJson(res, 200, { code: 0, message: '', data: true });
|
||||
}
|
||||
|
||||
if (req.method === 'POST' && parts.length === 3 && parts[2] === 'stop') {
|
||||
const session = getExistingSession(sessionKey);
|
||||
if (!session) {
|
||||
return sendError(res, 404, 'session not found');
|
||||
}
|
||||
await session.stop();
|
||||
sessions.delete(sessionKey);
|
||||
return sendJson(res, 200, { code: 0, message: '', data: true });
|
||||
}
|
||||
|
||||
return sendError(res, 404, 'not found');
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
console.error(error);
|
||||
return sendJson(res, 500, { code: 500, message, data: null });
|
||||
}
|
||||
});
|
||||
|
||||
server.on('upgrade', (req, socket) => {
|
||||
try {
|
||||
const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
|
||||
const parts = url.pathname.split('/').filter(Boolean);
|
||||
if (parts[0] !== 'whatsapp' || parts.length !== 4 || parts[2] !== 'events' || parts[3] !== 'ws') {
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
if (!isAuthorized(req)) {
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
const sessionKey = decodeURIComponent(parts[1]);
|
||||
const key = req.headers['sec-websocket-key'];
|
||||
if (!key) {
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
const accept = crypto.createHash('sha1').update(`${key}${WS_MAGIC}`).digest('base64');
|
||||
socket.write(
|
||||
[
|
||||
'HTTP/1.1 101 Switching Protocols',
|
||||
'Upgrade: websocket',
|
||||
'Connection: Upgrade',
|
||||
`Sec-WebSocket-Accept: ${accept}`,
|
||||
'',
|
||||
'',
|
||||
].join('\r\n'),
|
||||
);
|
||||
socket.setNoDelay(true);
|
||||
const session = getExistingSession(sessionKey);
|
||||
if (!session) {
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
const after = Number.parseInt(url.searchParams.get('after') || '0', 10) || 0;
|
||||
session.addSubscriber(socket, after);
|
||||
socket.on('end', () => {
|
||||
session.subscribers.forEach((subscriber) => {
|
||||
if (subscriber.socket === socket) {
|
||||
session.subscribers.delete(subscriber);
|
||||
}
|
||||
});
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
socket.destroy();
|
||||
}
|
||||
});
|
||||
|
||||
server.listen(PORT, HOST, () => {
|
||||
console.log(`WhatsApp gateway listening on http://${HOST}:${PORT}`);
|
||||
});
|
||||
1677
api/channels/whatsapp/gateway-node/package-lock.json
generated
Normal file
1677
api/channels/whatsapp/gateway-node/package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load Diff
12
api/channels/whatsapp/gateway-node/package.json
Normal file
12
api/channels/whatsapp/gateway-node/package.json
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"name": "ragflow-whatsapp-gateway",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"start": "node index.js"
|
||||
},
|
||||
"dependencies": {
|
||||
"baileys": "7.0.0-rc13",
|
||||
"qrcode": "^1.5.4"
|
||||
}
|
||||
}
|
||||
173
api/channels/whatsapp/gateway.py
Normal file
173
api/channels/whatsapp/gateway.py
Normal file
@@ -0,0 +1,173 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import shutil
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
_missing_command_warned = False
|
||||
_deps_install_warned = False
|
||||
|
||||
|
||||
def _env_flag(name: str, default: bool = False) -> bool:
|
||||
raw = os.getenv(name)
|
||||
if raw is None:
|
||||
return default
|
||||
return raw.strip().lower() in {"1", "true", "yes", "on"}
|
||||
|
||||
|
||||
def _default_gateway_command() -> list[str]:
|
||||
raw = os.getenv("WHATSAPP_GATEWAY_COMMAND", "").strip()
|
||||
if raw:
|
||||
return shlex.split(raw)
|
||||
gateway_entry = Path(__file__).resolve().parent / "gateway-node" / "index.js"
|
||||
node = shutil.which("node")
|
||||
if node and gateway_entry.exists():
|
||||
return [node, str(gateway_entry)]
|
||||
return []
|
||||
|
||||
|
||||
def _gateway_dir() -> Path:
|
||||
return Path(__file__).resolve().parent / "gateway-node"
|
||||
|
||||
|
||||
@dataclass
|
||||
class WhatsAppGatewayConfig:
|
||||
command: list[str]
|
||||
cwd: str
|
||||
enabled: bool
|
||||
|
||||
|
||||
class WhatsAppGatewayRuntime:
|
||||
def __init__(self) -> None:
|
||||
self._process: Optional[asyncio.subprocess.Process] = None
|
||||
self._lock = asyncio.Lock()
|
||||
self._install_lock = asyncio.Lock()
|
||||
self._sync_generation = 0
|
||||
|
||||
def _config(self) -> WhatsAppGatewayConfig:
|
||||
workdir = os.getenv("WHATSAPP_GATEWAY_WORKDIR", "").strip()
|
||||
return WhatsAppGatewayConfig(
|
||||
command=_default_gateway_command(),
|
||||
cwd=workdir or str(_gateway_dir()),
|
||||
enabled=_env_flag("WHATSAPP_GATEWAY_ENABLED", True),
|
||||
)
|
||||
|
||||
def is_running(self) -> bool:
|
||||
return bool(self._process and self._process.returncode is None)
|
||||
|
||||
async def sync(self, enabled: bool) -> None:
|
||||
cfg = self._config()
|
||||
should_run = bool(enabled and cfg.enabled and cfg.command)
|
||||
async with self._lock:
|
||||
self._sync_generation += 1
|
||||
generation = self._sync_generation
|
||||
if not should_run:
|
||||
await self._stop_locked()
|
||||
return
|
||||
if self.is_running():
|
||||
return
|
||||
|
||||
await self._ensure_dependencies(cfg)
|
||||
|
||||
async with self._lock:
|
||||
if generation != self._sync_generation:
|
||||
return
|
||||
if not should_run:
|
||||
await self._stop_locked()
|
||||
return
|
||||
await self._start_locked(cfg)
|
||||
|
||||
async def _ensure_dependencies(self, cfg: WhatsAppGatewayConfig) -> None:
|
||||
global _deps_install_warned
|
||||
if not _env_flag("WHATSAPP_GATEWAY_AUTO_INSTALL", True):
|
||||
return
|
||||
|
||||
async with self._install_lock:
|
||||
gateway_dir = Path(cfg.cwd)
|
||||
node_modules = gateway_dir / "node_modules"
|
||||
if node_modules.exists():
|
||||
return
|
||||
|
||||
npm = shutil.which("npm")
|
||||
if not npm:
|
||||
if not _deps_install_warned:
|
||||
LOGGER.warning(
|
||||
"npm is not available; WhatsApp gateway dependencies cannot be installed automatically"
|
||||
)
|
||||
_deps_install_warned = True
|
||||
return
|
||||
|
||||
package_json = gateway_dir / "package.json"
|
||||
if not package_json.exists():
|
||||
LOGGER.warning("WhatsApp gateway package.json not found in %s", gateway_dir)
|
||||
return
|
||||
|
||||
LOGGER.info("installing WhatsApp gateway dependencies in %s", gateway_dir)
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
npm,
|
||||
"install",
|
||||
"--no-fund",
|
||||
"--no-audit",
|
||||
cwd=str(gateway_dir),
|
||||
)
|
||||
try:
|
||||
code = await asyncio.wait_for(proc.wait(), timeout=300)
|
||||
except asyncio.TimeoutError as ex:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
raise RuntimeError("npm install timed out after 300s") from ex
|
||||
if code != 0:
|
||||
raise RuntimeError(f"npm install failed with exit code {code}")
|
||||
_deps_install_warned = False
|
||||
|
||||
async def _start_locked(self, cfg: WhatsAppGatewayConfig) -> None:
|
||||
global _missing_command_warned
|
||||
if self.is_running():
|
||||
return
|
||||
if not cfg.command:
|
||||
if not _missing_command_warned:
|
||||
LOGGER.warning("WhatsApp gateway command is not configured; gateway will not start")
|
||||
_missing_command_warned = True
|
||||
return
|
||||
_missing_command_warned = False
|
||||
|
||||
env = os.environ.copy()
|
||||
env.setdefault("PYTHONUNBUFFERED", "1")
|
||||
LOGGER.info("starting WhatsApp gateway: %s", " ".join(cfg.command))
|
||||
self._process = await asyncio.create_subprocess_exec(
|
||||
*cfg.command,
|
||||
cwd=cfg.cwd,
|
||||
env=env,
|
||||
)
|
||||
|
||||
async def _stop_locked(self) -> None:
|
||||
proc = self._process
|
||||
if proc is None:
|
||||
return
|
||||
|
||||
if proc.returncode is None:
|
||||
LOGGER.info("stopping WhatsApp gateway")
|
||||
proc.terminate()
|
||||
try:
|
||||
await asyncio.wait_for(proc.wait(), timeout=10)
|
||||
except asyncio.TimeoutError:
|
||||
LOGGER.warning("WhatsApp gateway did not stop in time; killing it")
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
except Exception:
|
||||
LOGGER.debug("WhatsApp gateway stop failed", exc_info=True)
|
||||
|
||||
self._process = None
|
||||
|
||||
|
||||
_gateway_runtime = WhatsAppGatewayRuntime()
|
||||
|
||||
|
||||
async def sync_whatsapp_gateway(enabled: bool) -> None:
|
||||
await _gateway_runtime.sync(enabled)
|
||||
Reference in New Issue
Block a user