mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-03 17:21:59 +08:00
feat[Go]: implement /api/v1/system/status GET (#15348)
### What problem does this PR solve? As title ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] New Feature (non-breaking change which adds functionality) - [x] Refactoring
This commit is contained in:
@@ -257,7 +257,7 @@ func (dao *ConnectorDAO) ListLogsByConnectorID(connectorID string, offset, limit
|
||||
"sync_logs.status",
|
||||
).
|
||||
Distinct().
|
||||
Order("sync_logs.update_time DESC").
|
||||
Order("sync_logs.update_date DESC").
|
||||
Offset(offset).
|
||||
Limit(limit).
|
||||
Scan(&logs).Error
|
||||
|
||||
@@ -173,6 +173,9 @@ func (h *ConnectorHandler) ListLogs(c *gin.Context) {
|
||||
jsonError(c, code, err.Error())
|
||||
return
|
||||
}
|
||||
if logs == nil {
|
||||
logs = []*entity.ConnectorSyncLog{}
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"code": common.CodeSuccess,
|
||||
|
||||
@@ -199,6 +199,7 @@ func TestConnectorHandlerListLogs(t *testing.T) {
|
||||
wantMsg string
|
||||
wantTotal float64
|
||||
wantLogID string
|
||||
wantLogs int
|
||||
}{
|
||||
{
|
||||
name: "success",
|
||||
@@ -225,6 +226,17 @@ func TestConnectorHandlerListLogs(t *testing.T) {
|
||||
wantCode: common.CodeSuccess,
|
||||
wantTotal: 1,
|
||||
wantLogID: "log-1",
|
||||
wantLogs: 1,
|
||||
},
|
||||
{
|
||||
name: "empty logs",
|
||||
service: fakeConnectorService{
|
||||
logs: nil,
|
||||
total: 0,
|
||||
},
|
||||
wantCode: common.CodeSuccess,
|
||||
wantTotal: 0,
|
||||
wantLogs: 0,
|
||||
},
|
||||
{
|
||||
name: "unauthorized",
|
||||
@@ -266,10 +278,23 @@ func TestConnectorHandlerListLogs(t *testing.T) {
|
||||
t.Fatalf("total=%v body=%v", data["total"], body)
|
||||
}
|
||||
logs := data["logs"].([]interface{})
|
||||
if len(logs) != tt.wantLogs {
|
||||
t.Fatalf("logs=%v body=%v", logs, body)
|
||||
}
|
||||
if logs[0].(map[string]interface{})["id"] != tt.wantLogID {
|
||||
t.Fatalf("logs=%v body=%v", logs, body)
|
||||
}
|
||||
}
|
||||
if tt.wantLogID == "" && tt.wantMsg == "" {
|
||||
data := body["data"].(map[string]interface{})
|
||||
if data["total"] != tt.wantTotal {
|
||||
t.Fatalf("total=%v body=%v", data["total"], body)
|
||||
}
|
||||
logs := data["logs"].([]interface{})
|
||||
if len(logs) != tt.wantLogs {
|
||||
t.Fatalf("logs=%v body=%v", logs, body)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,6 +115,27 @@ func (h *SystemHandler) GetConfigs(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
|
||||
// GetStatus get RAGFlow status
|
||||
func (h *SystemHandler) GetStatus(c *gin.Context) {
|
||||
_, errorCode, errorMessage := GetUser(c)
|
||||
if errorCode != common.CodeSuccess {
|
||||
jsonError(c, errorCode, errorMessage)
|
||||
return
|
||||
}
|
||||
|
||||
status, err := h.systemService.GetStatus()
|
||||
if err != nil {
|
||||
jsonError(c, common.CodeServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"code": common.CodeSuccess,
|
||||
"data": status,
|
||||
"message": "success",
|
||||
})
|
||||
}
|
||||
|
||||
// GetVersion get RAGFlow version
|
||||
// @Summary Get RAGFlow Version
|
||||
// @Description Get the current version of the application
|
||||
|
||||
@@ -337,6 +337,7 @@ func (r *Router) Setup(engine *gin.Engine) {
|
||||
system := v1.Group("/system")
|
||||
{
|
||||
system.GET("/configs", r.systemHandler.GetConfigs)
|
||||
system.GET("/status", r.systemHandler.GetStatus)
|
||||
log := system.Group("/log")
|
||||
{
|
||||
// /api/v1/system/log GET
|
||||
|
||||
@@ -364,5 +364,8 @@ func (s *ConnectorService) ListLog(connectorID, userID string, page, pageSize in
|
||||
if err != nil {
|
||||
return nil, 0, common.CodeServerError, fmt.Errorf("failed to fetch connector logs: %w", err)
|
||||
}
|
||||
if logs == nil {
|
||||
logs = []*entity.ConnectorSyncLog{}
|
||||
}
|
||||
return logs, total, common.CodeSuccess, nil
|
||||
}
|
||||
|
||||
@@ -18,14 +18,17 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"ragflow/internal/cache"
|
||||
"ragflow/internal/dao"
|
||||
"ragflow/internal/engine"
|
||||
"ragflow/internal/server"
|
||||
"ragflow/internal/storage"
|
||||
"ragflow/internal/utility"
|
||||
"time"
|
||||
)
|
||||
|
||||
// SystemService system service
|
||||
@@ -82,6 +85,207 @@ func (s *SystemService) GetVersion() (*VersionResponse, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ComponentStatus describes one dependency health check.
|
||||
type ComponentStatus map[string]interface{}
|
||||
|
||||
// StatusResponse system status response.
|
||||
type StatusResponse struct {
|
||||
DocEngine ComponentStatus `json:"doc_engine"`
|
||||
Storage ComponentStatus `json:"storage"`
|
||||
Database ComponentStatus `json:"database"`
|
||||
Redis ComponentStatus `json:"redis"`
|
||||
TaskExecutorHeartbeats map[string][]interface{} `json:"task_executor_heartbeats"`
|
||||
}
|
||||
|
||||
// GetStatus gets health status for core system dependencies.
|
||||
func (s *SystemService) GetStatus() (*StatusResponse, error) {
|
||||
return &StatusResponse{
|
||||
DocEngine: s.getDocEngineStatus(),
|
||||
Storage: s.getStorageStatus(),
|
||||
Database: s.getDatabaseStatus(),
|
||||
Redis: s.getRedisStatus(),
|
||||
TaskExecutorHeartbeats: s.getTaskExecutorHeartbeats(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *SystemService) getDocEngineStatus() ComponentStatus {
|
||||
cfg := server.GetConfig()
|
||||
docEngineType := ""
|
||||
if cfg != nil {
|
||||
docEngineType = strings.ToLower(string(cfg.DocEngine.Type))
|
||||
}
|
||||
|
||||
startedAt := time.Now()
|
||||
docEngine := engine.Get()
|
||||
if docEngine == nil {
|
||||
return ComponentStatus{
|
||||
"type": docEngineType,
|
||||
"status": "red",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
"error": "doc engine not initialized",
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := docEngine.Ping(ctx); err != nil {
|
||||
return ComponentStatus{
|
||||
"type": docEngine.GetType(),
|
||||
"status": "red",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
"error": err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
return ComponentStatus{
|
||||
"type": docEngine.GetType(),
|
||||
"status": "green",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SystemService) getStorageStatus() ComponentStatus {
|
||||
cfg := server.GetConfig()
|
||||
storageType := ""
|
||||
if cfg != nil {
|
||||
storageType = strings.ToLower(string(cfg.StorageEngine.Type))
|
||||
}
|
||||
|
||||
startedAt := time.Now()
|
||||
factory := storage.GetStorageFactory().GetStorage()
|
||||
if factory == nil {
|
||||
return ComponentStatus{
|
||||
"type": storageType,
|
||||
"status": "red",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
"error": "storage not initialized",
|
||||
}
|
||||
}
|
||||
|
||||
_, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if !factory.Health() {
|
||||
return ComponentStatus{
|
||||
"type": storageType,
|
||||
"status": "red",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
"error": "storage health check failed",
|
||||
}
|
||||
}
|
||||
|
||||
return ComponentStatus{
|
||||
"type": storageType,
|
||||
"status": "green",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SystemService) getDatabaseStatus() ComponentStatus {
|
||||
cfg := server.GetConfig()
|
||||
databaseType := ""
|
||||
if cfg != nil {
|
||||
databaseType = cfg.Database.Driver
|
||||
}
|
||||
|
||||
startedAt := time.Now()
|
||||
if dao.DB == nil {
|
||||
return ComponentStatus{
|
||||
"type": databaseType,
|
||||
"status": "red",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
"error": "database not initialized",
|
||||
}
|
||||
}
|
||||
|
||||
sqlDB, err := dao.GetDB().DB()
|
||||
if err != nil {
|
||||
return ComponentStatus{
|
||||
"type": databaseType,
|
||||
"status": "red",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
"error": err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
if err = sqlDB.Ping(); err != nil {
|
||||
return ComponentStatus{
|
||||
"type": databaseType,
|
||||
"status": "red",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
"error": err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
return ComponentStatus{
|
||||
"type": databaseType,
|
||||
"status": "green",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SystemService) getRedisStatus() ComponentStatus {
|
||||
startedAt := time.Now()
|
||||
redisClient := cache.Get()
|
||||
if redisClient == nil {
|
||||
return ComponentStatus{
|
||||
"status": "red",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
"error": "redis not initialized",
|
||||
}
|
||||
}
|
||||
if !redisClient.Health() {
|
||||
return ComponentStatus{
|
||||
"status": "red",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
"error": "Lost connection!",
|
||||
}
|
||||
}
|
||||
|
||||
return ComponentStatus{
|
||||
"status": "green",
|
||||
"elapsed": elapsedMilliseconds(startedAt),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SystemService) getTaskExecutorHeartbeats() map[string][]interface{} {
|
||||
heartbeatsByExecutor := map[string][]interface{}{}
|
||||
redisClient := cache.Get()
|
||||
if redisClient == nil {
|
||||
return heartbeatsByExecutor
|
||||
}
|
||||
|
||||
taskExecutorIDs, err := redisClient.SMembers("TASKEXE")
|
||||
if err != nil {
|
||||
return heartbeatsByExecutor
|
||||
}
|
||||
|
||||
now := float64(time.Now().Unix())
|
||||
for _, taskExecutorID := range taskExecutorIDs {
|
||||
rawHeartbeats, err := redisClient.ZRangeByScore(taskExecutorID, now-60*30, now)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
heartbeats := make([]interface{}, 0, len(rawHeartbeats))
|
||||
for _, rawHeartbeat := range rawHeartbeats {
|
||||
var heartbeat interface{}
|
||||
if err := json.Unmarshal([]byte(rawHeartbeat), &heartbeat); err != nil {
|
||||
heartbeats = append(heartbeats, rawHeartbeat)
|
||||
continue
|
||||
}
|
||||
heartbeats = append(heartbeats, heartbeat)
|
||||
}
|
||||
heartbeatsByExecutor[taskExecutorID] = heartbeats
|
||||
}
|
||||
|
||||
return heartbeatsByExecutor
|
||||
}
|
||||
|
||||
func elapsedMilliseconds(startedAt time.Time) string {
|
||||
return fmt.Sprintf("%.1f", float64(time.Since(startedAt).Microseconds())/1000.0)
|
||||
}
|
||||
|
||||
func okNok(ok bool) string {
|
||||
if ok {
|
||||
return "ok"
|
||||
|
||||
Reference in New Issue
Block a user