Files
ragflow/admin/server/config.py

334 lines
11 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import threading
from enum import Enum
from pydantic import BaseModel
from typing import Any
from common.config_utils import read_config
from urllib.parse import urlparse
class BaseConfig(BaseModel):
id: int
name: str
host: str
port: int
service_type: str
detail_func_name: str
def to_dict(self) -> dict[str, Any]:
return {"id": self.id, "name": self.name, "host": self.host, "port": self.port, "service_type": self.service_type}
class ServiceConfigs:
configs = list[BaseConfig]
def __init__(self):
self.configs = []
self.lock = threading.Lock()
SERVICE_CONFIGS = ServiceConfigs
class ServiceType(Enum):
METADATA = "metadata"
RETRIEVAL = "retrieval"
MESSAGE_QUEUE = "message_queue"
RAGFLOW_SERVER = "ragflow_server"
TASK_EXECUTOR = "task_executor"
FILE_STORE = "file_store"
class MetaConfig(BaseConfig):
meta_type: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
extra_dict = result["extra"].copy()
extra_dict["meta_type"] = self.meta_type
result["extra"] = extra_dict
return result
class MySQLConfig(MetaConfig):
username: str
password: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
extra_dict = result["extra"].copy()
extra_dict["username"] = self.username
extra_dict["password"] = self.password
result["extra"] = extra_dict
return result
class PostgresConfig(MetaConfig):
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
return result
class RetrievalConfig(BaseConfig):
retrieval_type: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
extra_dict = result["extra"].copy()
extra_dict["retrieval_type"] = self.retrieval_type
result["extra"] = extra_dict
return result
class InfinityConfig(RetrievalConfig):
db_name: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
extra_dict = result["extra"].copy()
extra_dict["db_name"] = self.db_name
result["extra"] = extra_dict
return result
class ElasticsearchConfig(RetrievalConfig):
username: str
password: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
extra_dict = result["extra"].copy()
extra_dict["username"] = self.username
extra_dict["password"] = self.password
result["extra"] = extra_dict
return result
class MessageQueueConfig(BaseConfig):
mq_type: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
extra_dict = result["extra"].copy()
extra_dict["mq_type"] = self.mq_type
result["extra"] = extra_dict
return result
class RedisConfig(MessageQueueConfig):
database: int
password: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
extra_dict = result["extra"].copy()
extra_dict["database"] = self.database
extra_dict["password"] = self.password
result["extra"] = extra_dict
return result
class RabbitMQConfig(MessageQueueConfig):
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
return result
class RAGFlowServerConfig(BaseConfig):
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
return result
class TaskExecutorConfig(BaseConfig):
message_queue_type: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
result["extra"]["message_queue_type"] = self.message_queue_type
return result
class FileStoreConfig(BaseConfig):
store_type: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
extra_dict = result["extra"].copy()
extra_dict["store_type"] = self.store_type
result["extra"] = extra_dict
return result
class MinioConfig(FileStoreConfig):
user: str
password: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if "extra" not in result:
result["extra"] = dict()
extra_dict = result["extra"].copy()
extra_dict["user"] = self.user
extra_dict["password"] = self.password
result["extra"] = extra_dict
return result
def load_configurations(config_path: str) -> list[BaseConfig]:
raw_configs = read_config(config_path)
configurations = []
ragflow_count = 0
id_count = 0
for k, v in raw_configs.items():
match k:
case "ragflow":
name: str = f"ragflow_{ragflow_count}"
host: str = v["host"]
http_port: int = v["http_port"]
config = RAGFlowServerConfig(id=id_count, name=name, host=host, port=http_port, service_type="ragflow_server", detail_func_name="check_ragflow_server_alive")
configurations.append(config)
id_count += 1
case "es":
name: str = "elasticsearch"
url = v["hosts"]
parsed = urlparse(url)
host: str = parsed.hostname
port: int = parsed.port
username: str = v.get("username")
password: str = v.get("password")
config = ElasticsearchConfig(
id=id_count,
name=name,
host=host,
port=port,
service_type="retrieval",
retrieval_type="elasticsearch",
username=username,
password=password,
detail_func_name="get_es_cluster_stats",
)
configurations.append(config)
id_count += 1
case "infinity":
name: str = "infinity"
url = v["uri"]
parts = url.split(":", 1)
host = parts[0]
port = int(parts[1])
database: str = v.get("db_name", "default_db")
config = InfinityConfig(id=id_count, name=name, host=host, port=port, service_type="retrieval", retrieval_type="infinity", db_name=database, detail_func_name="get_infinity_status")
configurations.append(config)
id_count += 1
case "minio_0":
name: str = "minio_0"
url = v["host"]
parts = url.split(":", 1)
host = parts[0]
port = int(parts[1])
user = v.get("user")
password = v.get("password")
config = MinioConfig(id=id_count, name=name, host=host, port=port, user=user, password=password, service_type="file_store", store_type="minio", detail_func_name="check_minio_alive")
configurations.append(config)
id_count += 1
case "minio":
name: str = "minio"
url = v["host"]
parts = url.split(":", 1)
host = parts[0]
port = int(parts[1])
user = v.get("user")
password = v.get("password")
config = MinioConfig(id=id_count, name=name, host=host, port=port, user=user, password=password, service_type="file_store", store_type="minio", detail_func_name="check_minio_alive")
configurations.append(config)
id_count += 1
case "redis":
name: str = "redis"
url = v["host"]
parts = url.split(":", 1)
host = parts[0]
port = int(parts[1])
password = v.get("password")
db: int = v.get("db")
config = RedisConfig(id=id_count, name=name, host=host, port=port, password=password, database=db, service_type="message_queue", mq_type="redis", detail_func_name="get_redis_info")
configurations.append(config)
id_count += 1
case "mysql":
name: str = "mysql"
host: str = v.get("host")
port: int = v.get("port")
username = v.get("user")
password = v.get("password")
config = MySQLConfig(
id=id_count, name=name, host=host, port=port, username=username, password=password, service_type="meta_data", meta_type="mysql", detail_func_name="get_mysql_status"
)
configurations.append(config)
id_count += 1
case "admin":
pass
case "task_executor":
name: str = "task_executor"
host: str = v.get("host", "")
port: int = v.get("port", 0)
message_queue_type: str = v.get("message_queue_type")
config = TaskExecutorConfig(
id=id_count, name=name, host=host, port=port, message_queue_type=message_queue_type, service_type="task_executor", detail_func_name="check_task_executor_alive"
)
configurations.append(config)
id_count += 1
case "rabbitmq":
name: str = "rabbitmq"
host: str = v.get("host")
port: int = v.get("port")
config = RabbitMQConfig(id=id_count, name=name, host=host, port=port, service_type="message_queue", mq_type="rabbitmq", detail_func_name="check_rabbitmq_alive")
configurations.append(config)
id_count += 1
case _:
logging.warning(f"Unknown configuration key: {k}")
continue
return configurations