diff --git a/internal/dao/connector.go b/internal/dao/connector.go index 312d56a37b..e10a28221d 100644 --- a/internal/dao/connector.go +++ b/internal/dao/connector.go @@ -257,7 +257,7 @@ func (dao *ConnectorDAO) ListLogsByConnectorID(connectorID string, offset, limit "sync_logs.status", ). Distinct(). - Order("sync_logs.update_time DESC"). + Order("sync_logs.update_date DESC"). Offset(offset). Limit(limit). Scan(&logs).Error diff --git a/internal/handler/connector.go b/internal/handler/connector.go index 387de50fbf..d693b50ad3 100644 --- a/internal/handler/connector.go +++ b/internal/handler/connector.go @@ -173,6 +173,9 @@ func (h *ConnectorHandler) ListLogs(c *gin.Context) { jsonError(c, code, err.Error()) return } + if logs == nil { + logs = []*entity.ConnectorSyncLog{} + } c.JSON(http.StatusOK, gin.H{ "code": common.CodeSuccess, diff --git a/internal/handler/connector_test.go b/internal/handler/connector_test.go index 06c643b211..11710b5508 100644 --- a/internal/handler/connector_test.go +++ b/internal/handler/connector_test.go @@ -199,6 +199,7 @@ func TestConnectorHandlerListLogs(t *testing.T) { wantMsg string wantTotal float64 wantLogID string + wantLogs int }{ { name: "success", @@ -225,6 +226,17 @@ func TestConnectorHandlerListLogs(t *testing.T) { wantCode: common.CodeSuccess, wantTotal: 1, wantLogID: "log-1", + wantLogs: 1, + }, + { + name: "empty logs", + service: fakeConnectorService{ + logs: nil, + total: 0, + }, + wantCode: common.CodeSuccess, + wantTotal: 0, + wantLogs: 0, }, { name: "unauthorized", @@ -266,10 +278,23 @@ func TestConnectorHandlerListLogs(t *testing.T) { t.Fatalf("total=%v body=%v", data["total"], body) } logs := data["logs"].([]interface{}) + if len(logs) != tt.wantLogs { + t.Fatalf("logs=%v body=%v", logs, body) + } if logs[0].(map[string]interface{})["id"] != tt.wantLogID { t.Fatalf("logs=%v body=%v", logs, body) } } + if tt.wantLogID == "" && tt.wantMsg == "" { + data := body["data"].(map[string]interface{}) + if data["total"] != tt.wantTotal { + t.Fatalf("total=%v body=%v", data["total"], body) + } + logs := data["logs"].([]interface{}) + if len(logs) != tt.wantLogs { + t.Fatalf("logs=%v body=%v", logs, body) + } + } }) } } diff --git a/internal/handler/system.go b/internal/handler/system.go index 66052df2b1..fe07b92555 100644 --- a/internal/handler/system.go +++ b/internal/handler/system.go @@ -115,6 +115,27 @@ func (h *SystemHandler) GetConfigs(c *gin.Context) { }) } +// GetStatus get RAGFlow status +func (h *SystemHandler) GetStatus(c *gin.Context) { + _, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + status, err := h.systemService.GetStatus() + if err != nil { + jsonError(c, common.CodeServerError, err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": common.CodeSuccess, + "data": status, + "message": "success", + }) +} + // GetVersion get RAGFlow version // @Summary Get RAGFlow Version // @Description Get the current version of the application diff --git a/internal/router/router.go b/internal/router/router.go index 2c625ecab9..e1f2e48f1f 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -337,6 +337,7 @@ func (r *Router) Setup(engine *gin.Engine) { system := v1.Group("/system") { system.GET("/configs", r.systemHandler.GetConfigs) + system.GET("/status", r.systemHandler.GetStatus) log := system.Group("/log") { // /api/v1/system/log GET diff --git a/internal/service/connector.go b/internal/service/connector.go index cbc8acb396..88449cbbd7 100644 --- a/internal/service/connector.go +++ b/internal/service/connector.go @@ -364,5 +364,8 @@ func (s *ConnectorService) ListLog(connectorID, userID string, page, pageSize in if err != nil { return nil, 0, common.CodeServerError, fmt.Errorf("failed to fetch connector logs: %w", err) } + if logs == nil { + logs = []*entity.ConnectorSyncLog{} + } return logs, total, common.CodeSuccess, nil } diff --git a/internal/service/system.go b/internal/service/system.go index dc2bdb72bd..aacfe02132 100644 --- a/internal/service/system.go +++ b/internal/service/system.go @@ -18,14 +18,17 @@ package service import ( "context" + "encoding/json" "fmt" + "strings" + "time" + "ragflow/internal/cache" "ragflow/internal/dao" "ragflow/internal/engine" "ragflow/internal/server" "ragflow/internal/storage" "ragflow/internal/utility" - "time" ) // SystemService system service @@ -82,6 +85,207 @@ func (s *SystemService) GetVersion() (*VersionResponse, error) { }, nil } +// ComponentStatus describes one dependency health check. +type ComponentStatus map[string]interface{} + +// StatusResponse system status response. +type StatusResponse struct { + DocEngine ComponentStatus `json:"doc_engine"` + Storage ComponentStatus `json:"storage"` + Database ComponentStatus `json:"database"` + Redis ComponentStatus `json:"redis"` + TaskExecutorHeartbeats map[string][]interface{} `json:"task_executor_heartbeats"` +} + +// GetStatus gets health status for core system dependencies. +func (s *SystemService) GetStatus() (*StatusResponse, error) { + return &StatusResponse{ + DocEngine: s.getDocEngineStatus(), + Storage: s.getStorageStatus(), + Database: s.getDatabaseStatus(), + Redis: s.getRedisStatus(), + TaskExecutorHeartbeats: s.getTaskExecutorHeartbeats(), + }, nil +} + +func (s *SystemService) getDocEngineStatus() ComponentStatus { + cfg := server.GetConfig() + docEngineType := "" + if cfg != nil { + docEngineType = strings.ToLower(string(cfg.DocEngine.Type)) + } + + startedAt := time.Now() + docEngine := engine.Get() + if docEngine == nil { + return ComponentStatus{ + "type": docEngineType, + "status": "red", + "elapsed": elapsedMilliseconds(startedAt), + "error": "doc engine not initialized", + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := docEngine.Ping(ctx); err != nil { + return ComponentStatus{ + "type": docEngine.GetType(), + "status": "red", + "elapsed": elapsedMilliseconds(startedAt), + "error": err.Error(), + } + } + + return ComponentStatus{ + "type": docEngine.GetType(), + "status": "green", + "elapsed": elapsedMilliseconds(startedAt), + } +} + +func (s *SystemService) getStorageStatus() ComponentStatus { + cfg := server.GetConfig() + storageType := "" + if cfg != nil { + storageType = strings.ToLower(string(cfg.StorageEngine.Type)) + } + + startedAt := time.Now() + factory := storage.GetStorageFactory().GetStorage() + if factory == nil { + return ComponentStatus{ + "type": storageType, + "status": "red", + "elapsed": elapsedMilliseconds(startedAt), + "error": "storage not initialized", + } + } + + _, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if !factory.Health() { + return ComponentStatus{ + "type": storageType, + "status": "red", + "elapsed": elapsedMilliseconds(startedAt), + "error": "storage health check failed", + } + } + + return ComponentStatus{ + "type": storageType, + "status": "green", + "elapsed": elapsedMilliseconds(startedAt), + } +} + +func (s *SystemService) getDatabaseStatus() ComponentStatus { + cfg := server.GetConfig() + databaseType := "" + if cfg != nil { + databaseType = cfg.Database.Driver + } + + startedAt := time.Now() + if dao.DB == nil { + return ComponentStatus{ + "type": databaseType, + "status": "red", + "elapsed": elapsedMilliseconds(startedAt), + "error": "database not initialized", + } + } + + sqlDB, err := dao.GetDB().DB() + if err != nil { + return ComponentStatus{ + "type": databaseType, + "status": "red", + "elapsed": elapsedMilliseconds(startedAt), + "error": err.Error(), + } + } + + if err = sqlDB.Ping(); err != nil { + return ComponentStatus{ + "type": databaseType, + "status": "red", + "elapsed": elapsedMilliseconds(startedAt), + "error": err.Error(), + } + } + + return ComponentStatus{ + "type": databaseType, + "status": "green", + "elapsed": elapsedMilliseconds(startedAt), + } +} + +func (s *SystemService) getRedisStatus() ComponentStatus { + startedAt := time.Now() + redisClient := cache.Get() + if redisClient == nil { + return ComponentStatus{ + "status": "red", + "elapsed": elapsedMilliseconds(startedAt), + "error": "redis not initialized", + } + } + if !redisClient.Health() { + return ComponentStatus{ + "status": "red", + "elapsed": elapsedMilliseconds(startedAt), + "error": "Lost connection!", + } + } + + return ComponentStatus{ + "status": "green", + "elapsed": elapsedMilliseconds(startedAt), + } +} + +func (s *SystemService) getTaskExecutorHeartbeats() map[string][]interface{} { + heartbeatsByExecutor := map[string][]interface{}{} + redisClient := cache.Get() + if redisClient == nil { + return heartbeatsByExecutor + } + + taskExecutorIDs, err := redisClient.SMembers("TASKEXE") + if err != nil { + return heartbeatsByExecutor + } + + now := float64(time.Now().Unix()) + for _, taskExecutorID := range taskExecutorIDs { + rawHeartbeats, err := redisClient.ZRangeByScore(taskExecutorID, now-60*30, now) + if err != nil { + continue + } + + heartbeats := make([]interface{}, 0, len(rawHeartbeats)) + for _, rawHeartbeat := range rawHeartbeats { + var heartbeat interface{} + if err := json.Unmarshal([]byte(rawHeartbeat), &heartbeat); err != nil { + heartbeats = append(heartbeats, rawHeartbeat) + continue + } + heartbeats = append(heartbeats, heartbeat) + } + heartbeatsByExecutor[taskExecutorID] = heartbeats + } + + return heartbeatsByExecutor +} + +func elapsedMilliseconds(startedAt time.Time) string { + return fmt.Sprintf("%.1f", float64(time.Since(startedAt).Microseconds())/1000.0) +} + func okNok(ok bool) string { if ok { return "ok"