From d766e4912837bea8d352a2fb4878b73ef962e62c Mon Sep 17 00:00:00 2001 From: Haruko386 Date: Fri, 29 May 2026 19:32:21 +0800 Subject: [PATCH] feat[Go]: implement /system/stats and refactor /system/config/log (#15407) ### What problem does this PR solve? As title ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Refactoring --- internal/common/logger.go | 133 +++++++++++++++++++++++++++++-------- internal/dao/api_token.go | 40 +++++++++++ internal/handler/stats.go | 69 +++++++++++++++++++ internal/handler/system.go | 28 ++++---- internal/router/router.go | 18 +++-- internal/service/stats.go | 73 ++++++++++++++++++++ 6 files changed, 314 insertions(+), 47 deletions(-) create mode 100644 internal/handler/stats.go create mode 100644 internal/service/stats.go diff --git a/internal/common/logger.go b/internal/common/logger.go index 8650ba06c6..f856761c04 100644 --- a/internal/common/logger.go +++ b/internal/common/logger.go @@ -18,7 +18,9 @@ package common import ( "fmt" + "os" "runtime" + "strings" "sync" "go.uber.org/zap" @@ -30,23 +32,70 @@ var ( Sugar *zap.SugaredLogger levelMu sync.RWMutex atomicLevel zap.AtomicLevel + pkgLevels map[string]string ) +func parseZapLevel(level string) (zapcore.Level, error) { + switch strings.ToLower(strings.TrimSpace(level)) { + case "debug": + return zapcore.DebugLevel, nil + case "info": + return zapcore.InfoLevel, nil + case "warn", "warning": + return zapcore.WarnLevel, nil + case "error": + return zapcore.ErrorLevel, nil + case "fatal": + return zapcore.FatalLevel, nil + case "panic": + return zapcore.PanicLevel, nil + default: + return zapcore.InfoLevel, fmt.Errorf("unknown log level: %s", level) + } +} + +func logLevelName(level zapcore.Level) string { + if level == zapcore.WarnLevel { + return "WARNING" + } + return strings.ToUpper(level.String()) +} + +func initPackageLogLevels(rootLevel zapcore.Level) { + levels := make(map[string]string) + for _, item := range strings.Split(os.Getenv("LOG_LEVELS"), ",") { + terms := strings.SplitN(item, "=", 2) + if len(terms) != 2 { + continue + } + pkgName := strings.TrimSpace(terms[0]) + if pkgName == "" { + continue + } + level, err := parseZapLevel(terms[1]) + if err != nil { + level = zapcore.InfoLevel + } + levels[pkgName] = logLevelName(level) + } + // I set it to align with python for now, we shall change it later before ragflow 1.0 + if _, ok := levels["peewee"]; !ok { + levels["peewee"] = logLevelName(zapcore.WarnLevel) + } + if _, ok := levels["pdfminer"]; !ok { + levels["pdfminer"] = logLevelName(zapcore.WarnLevel) + } + if _, ok := levels["root"]; !ok { + levels["root"] = logLevelName(rootLevel) + } + pkgLevels = levels +} + // Init initializes the global logger // Note: This requires zap to be installed: go get go.uber.org/zap func Init(level string) error { - // Parse log level - var zapLevel zapcore.Level - switch level { - case "debug": - zapLevel = zapcore.DebugLevel - case "info": - zapLevel = zapcore.InfoLevel - case "warn": - zapLevel = zapcore.WarnLevel - case "error": - zapLevel = zapcore.ErrorLevel - default: + zapLevel, err := parseZapLevel(level) + if err != nil { zapLevel = zapcore.InfoLevel } @@ -88,6 +137,10 @@ func Init(level string) error { Logger = logger Sugar = logger.Sugar() + levelMu.Lock() + initPackageLogLevels(zapLevel) + levelMu.Unlock() + return nil } @@ -155,29 +208,51 @@ func GetLevel() string { return atomicLevel.String() } +// GetLogLevels returns Python-compatible package log levels. +func GetLogLevels() map[string]string { + levelMu.RLock() + defer levelMu.RUnlock() + + levels := make(map[string]string, len(pkgLevels)) + for pkgName, level := range pkgLevels { + levels[pkgName] = level + } + return levels +} + // SetLevel sets the log level at runtime func SetLevel(level string) error { levelMu.Lock() defer levelMu.Unlock() - var zapLevel zapcore.Level - switch level { - case "debug": - zapLevel = zapcore.DebugLevel - case "info": - zapLevel = zapcore.InfoLevel - case "warn", "warning": - zapLevel = zapcore.WarnLevel - case "error": - zapLevel = zapcore.ErrorLevel - case "fatal": - zapLevel = zapcore.FatalLevel - case "panic": - zapLevel = zapcore.PanicLevel - default: - return fmt.Errorf("unknown log level: %s", level) + zapLevel, err := parseZapLevel(level) + if err != nil { + return err } - atomicLevel.SetLevel(zapLevel) + if pkgLevels == nil { + pkgLevels = make(map[string]string) + } + pkgLevels["root"] = logLevelName(zapLevel) + return nil +} + +// SetPackageLogLevel sets a Python-compatible package log level at runtime. +func SetPackageLogLevel(pkgName, level string) error { + zapLevel, err := parseZapLevel(level) + if err != nil { + return err + } + + levelMu.Lock() + defer levelMu.Unlock() + + if pkgLevels == nil { + pkgLevels = make(map[string]string) + } + pkgLevels[pkgName] = logLevelName(zapLevel) + if pkgName == "root" { + atomicLevel.SetLevel(zapLevel) + } return nil } diff --git a/internal/dao/api_token.go b/internal/dao/api_token.go index 1ce91a6654..9385bab00c 100644 --- a/internal/dao/api_token.go +++ b/internal/dao/api_token.go @@ -79,6 +79,46 @@ func NewAPI4ConversationDAO() *API4ConversationDAO { return &API4ConversationDAO{} } +// ConversationStatsRow is one daily aggregate row for api_4_conversation. +type ConversationStatsRow struct { + Dt string `gorm:"column:dt"` + PV int64 `gorm:"column:pv"` + UV int64 `gorm:"column:uv"` + Tokens float64 `gorm:"column:tokens"` + Duration float64 `gorm:"column:duration"` + Round float64 `gorm:"column:round"` + ThumbUp int64 `gorm:"column:thumb_up"` +} + +// Stats returns daily conversation aggregates for a tenant. +func (dao *API4ConversationDAO) Stats(tenantID, fromDate, toDate string, source *string) ([]ConversationStatsRow, error) { + var rows []ConversationStatsRow + dateExpr := "DATE_FORMAT(a.create_date, '%Y-%m-%d 00:00:00')" + db := DB.Table("api_4_conversation AS a"). + Select(` + DATE_FORMAT(a.create_date, '%Y-%m-%d 00:00:00') AS dt, + COUNT(a.id) AS pv, + COUNT(DISTINCT a.user_id) AS uv, + COALESCE(SUM(a.tokens), 0) AS tokens, + COALESCE(SUM(a.duration), 0) AS duration, + COALESCE(AVG(a.round), 0) AS round, + COALESCE(SUM(a.thumb_up), 0) AS thumb_up + `). + Joins("JOIN dialog AS d ON a.dialog_id = d.id AND d.tenant_id = ?", tenantID). + Where("a.create_date >= ? AND a.create_date <= ?", fromDate, toDate) + + if source == nil { + db = db.Where("a.source IS NULL") + } else { + db = db.Where("a.source = ?", *source) + } + + err := db.Group(dateExpr). + Order(dateExpr). + Scan(&rows).Error + return rows, err +} + // DeleteByDialogIDs deletes API4Conversations by dialog IDs (hard delete) func (dao *API4ConversationDAO) DeleteByDialogIDs(dialogIDs []string) (int64, error) { if len(dialogIDs) == 0 { diff --git a/internal/handler/stats.go b/internal/handler/stats.go new file mode 100644 index 0000000000..e586a7f7f8 --- /dev/null +++ b/internal/handler/stats.go @@ -0,0 +1,69 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package handler + +import ( + "errors" + "net/http" + "time" + + "ragflow/internal/common" + "ragflow/internal/service" + + "github.com/gin-gonic/gin" +) + +// GetStats returns API conversation statistics for the current user's tenant. +func (h *SystemHandler) GetStats(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + now := time.Now() + fromDate := c.DefaultQuery("from_date", now.AddDate(0, 0, -7).Format("2006-01-02 00:00:00")) + toDate := c.DefaultQuery("to_date", now.Format("2006-01-02 15:04:05")) + if len(toDate) == 10 { + toDate += " 23:59:59" + } + + var source *string + if _, ok := c.GetQuery("canvas_id"); ok { + agentSource := "agent" + source = &agentSource + } + + stats, err := h.systemService.GetStats(user.ID, fromDate, toDate, source) + if err != nil { + code := common.CodeExceptionError + if errors.Is(err, service.ErrTenantNotFound) { + code = common.CodeDataError + } + c.JSON(http.StatusOK, gin.H{ + "code": code, + "message": err.Error(), + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": common.CodeSuccess, + "message": "success", + "data": stats, + }) +} diff --git a/internal/handler/system.go b/internal/handler/system.go index fe07b92555..e5ac475a17 100644 --- a/internal/handler/system.go +++ b/internal/handler/system.go @@ -164,44 +164,46 @@ func (h *SystemHandler) GetVersion(c *gin.Context) { // GetLogLevel returns the current log level func (h *SystemHandler) GetLogLevel(c *gin.Context) { - level := common.GetLevel() c.JSON(http.StatusOK, gin.H{ "code": 0, "message": "success", - "data": gin.H{"level": level}, + "data": common.GetLogLevels(), }) } // SetLogLevelRequest set log level request type SetLogLevelRequest struct { - Level string `json:"level" binding:"required"` + PkgName string `json:"pkg_name" binding:"required"` + Level string `json:"level" binding:"required"` } // SetLogLevel sets the log level at runtime func (h *SystemHandler) SetLogLevel(c *gin.Context) { var req SetLogLevelRequest if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{ - "code": 400, - "message": "level is required", + c.JSON(http.StatusOK, gin.H{ + "code": common.CodeDataError, + "message": "pkg_name and level are required", }) return } - if err := common.SetLevel(req.Level); err != nil { - c.JSON(http.StatusBadRequest, gin.H{ - "code": 400, - "message": err.Error(), + if err := common.SetPackageLogLevel(req.PkgName, req.Level); err != nil { + c.JSON(http.StatusOK, gin.H{ + "code": common.CodeDataError, + "message": "Invalid log level: " + req.Level, }) return } config := server.GetConfig() - config.Log.Level = req.Level + if req.PkgName == "root" && config != nil { + config.Log.Level = common.GetLevel() + } c.JSON(http.StatusOK, gin.H{ "code": 0, - "message": "Log level updated successfully", - "data": gin.H{"level": req.Level}, + "message": "success", + "data": gin.H{"pkg_name": req.PkgName, "level": req.Level}, }) } diff --git a/internal/router/router.go b/internal/router/router.go index a6a8fe9e6c..b41ab52042 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -343,14 +343,22 @@ func (r *Router) Setup(engine *gin.Engine) { { system.GET("/configs", r.systemHandler.GetConfigs) system.GET("/status", r.systemHandler.GetStatus) - log := system.Group("/log") + system.GET("/stats", r.systemHandler.GetStats) + + config := system.Group("/config") { - // /api/v1/system/log GET - log.GET("", r.systemHandler.GetLogLevel) - // /api/v1/system/log PUT - log.PUT("", r.systemHandler.SetLogLevel) + config.GET("/log", r.systemHandler.GetLogLevel) + config.PUT("/log", r.systemHandler.SetLogLevel) } + //log := system.Group("/log") + //{ + // // /api/v1/system/log GET + // log.GET("", r.systemHandler.GetLogLevel) + // // /api/v1/system/log PUT + // log.PUT("", r.systemHandler.SetLogLevel) + //} + tokens := system.Group("/tokens") { // list tokens /api/v1/system/tokens GET diff --git a/internal/service/stats.go b/internal/service/stats.go new file mode 100644 index 0000000000..39227cf7ab --- /dev/null +++ b/internal/service/stats.go @@ -0,0 +1,73 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package service + +import ( + "errors" + + "ragflow/internal/dao" +) + +// ErrTenantNotFound indicates the current user has no tenant relation. +var ErrTenantNotFound = errors.New("Tenant not found!") + +// StatPoint matches the frontend [date, value] tuple shape. +type StatPoint [2]interface{} + +// StatsResponse matches Python GET /api/v1/system/stats response data. +type StatsResponse struct { + PV []StatPoint `json:"pv"` + UV []StatPoint `json:"uv"` + Speed []StatPoint `json:"speed"` + Tokens []StatPoint `json:"tokens"` + Round []StatPoint `json:"round"` + ThumbUp []StatPoint `json:"thumb_up"` +} + +// GetStats returns daily API conversation statistics for the first tenant of a user. +func (s *SystemService) GetStats(userID, fromDate, toDate string, source *string) (*StatsResponse, error) { + userTenantDAO := dao.NewUserTenantDAO() + tenants, err := userTenantDAO.GetByUserID(userID) + if err != nil || len(tenants) == 0 { + return nil, ErrTenantNotFound + } + + rows, err := dao.NewAPI4ConversationDAO().Stats(tenants[0].TenantID, fromDate, toDate, source) + if err != nil { + return nil, err + } + + response := &StatsResponse{ + PV: make([]StatPoint, 0, len(rows)), + UV: make([]StatPoint, 0, len(rows)), + Speed: make([]StatPoint, 0, len(rows)), + Tokens: make([]StatPoint, 0, len(rows)), + Round: make([]StatPoint, 0, len(rows)), + ThumbUp: make([]StatPoint, 0, len(rows)), + } + + for _, row := range rows { + response.PV = append(response.PV, StatPoint{row.Dt, row.PV}) + response.UV = append(response.UV, StatPoint{row.Dt, row.UV}) + response.Speed = append(response.Speed, StatPoint{row.Dt, row.Tokens / (row.Duration + 0.1)}) + response.Tokens = append(response.Tokens, StatPoint{row.Dt, row.Tokens / 1000.0}) + response.Round = append(response.Round, StatPoint{row.Dt, row.Round}) + response.ThumbUp = append(response.ThumbUp, StatPoint{row.Dt, row.ThumbUp}) + } + + return response, nil +}