Update go server (#13589)

### What problem does this PR solve?

1. Add more CLI command
2. Add some license hooks

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Refactoring

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai
2026-03-13 14:41:02 +08:00
committed by GitHub
parent ef94a9c291
commit 5c955a31cc
13 changed files with 200 additions and 47 deletions

View File

@@ -95,6 +95,7 @@ sql_command: login_user
| list_server_configs
| show_fingerprint
| set_license
| set_license_config
| show_license
| check_license
| benchmark
@@ -185,6 +186,7 @@ SERVER: "SERVER"i
FINGERPRINT: "FINGERPRINT"i
LICENSE: "LICENSE"i
CHECK: "CHECK"i
CONFIG: "CONFIG"i
login_user: LOGIN USER quoted_string ";"
list_services: LIST SERVICES ";"
@@ -232,6 +234,7 @@ list_environments: LIST ENVS ";"
show_fingerprint: SHOW FINGERPRINT ";"
set_license: SET LICENSE quoted_string ";"
set_license_config: SET LICENSE CONFIG NUMBER NUMBER ";"
show_license: SHOW LICENSE ";"
check_license: CHECK LICENSE ";"
@@ -496,6 +499,11 @@ class RAGFlowCLITransformer(Transformer):
license = items[2].children[0].strip("'\"")
return {"type": "set_license", "license": license}
def set_license_config(self, items):
value1: int = int(items[3])
value2: int = int(items[4])
return {"type": "set_license_config", "value1": value1, "value2": value2}
def show_license(self, items):
return {"type": "show_license"}

View File

@@ -604,6 +604,18 @@ class RAGFlowClient:
else:
print(f"Fail to set license, code: {res_json['code']}, message: {res_json['message']}")
def set_license_config(self, command):
if self.server_type != "admin":
print("This command is only allowed in ADMIN mode")
value1 = command["value1"]
value2 = command["value2"]
response = self.http_client.request("POST", "/admin/license/config", json_body={"value1": value1, "value2": value2}, use_api_base=True, auth_kind="admin")
res_json = response.json()
if response.status_code == 200:
print("Set license successfully")
else:
print(f"Fail to set license, code: {res_json['code']}, message: {res_json['message']}")
def show_license(self, command):
if self.server_type != "admin":
print("This command is only allowed in ADMIN mode")
@@ -1560,6 +1572,8 @@ def run_command(client: RAGFlowClient, command_dict: dict):
client.show_fingerprint(command_dict)
case "set_license":
client.set_license(command_dict)
case "set_license_config":
client.set_license_config(command_dict)
case "show_license":
client.show_license(command_dict)
case "check_license":

View File

@@ -53,7 +53,7 @@ func main() {
flag.Parse()
// Initialize logger
if err := logger.Init("debug"); err != nil {
if err := logger.Init("info"); err != nil {
panic("failed to initialize logger: " + err.Error())
}

View File

@@ -242,12 +242,11 @@ func startServer(config *server.Config) {
} else {
// Start heartbeat reporter with 30 seconds interval
heartbeatReporter := utility.NewScheduledTask("Heartbeat reporter", 3*time.Second, func() {
var message string
if err, message = heartbeatService.SendHeartbeat(); err == nil {
if err = heartbeatService.SendHeartbeat(); err == nil {
local.SetAdminStatus(0, "")
} else {
local.SetAdminStatus(1, message)
logger.Warn("Failed to send heartbeat", zap.Error(err))
local.SetAdminStatus(1, err.Error())
logger.Warn(fmt.Sprintf(err.Error()))
}
})
heartbeatReporter.Start()

View File

@@ -88,6 +88,20 @@ func errorResponse(c *gin.Context, message string, code int) {
})
}
func responseWithCode(c *gin.Context, message string, httpCode int, errorCode common.ErrorCode) {
if message == "" {
c.JSON(httpCode, ErrorResponse{
Code: int(errorCode),
Message: errorCode.Message(),
})
} else {
c.JSON(httpCode, ErrorResponse{
Code: int(errorCode),
Message: message,
})
}
}
// Health health check
func (h *Handler) Health(c *gin.Context) {
c.JSON(200, gin.H{"status": "ok"})
@@ -909,6 +923,19 @@ func (h *Handler) SetLicense(c *gin.Context) {
return
}
type SetLicenseConfigHTTPRequest struct {
TimeRecordSaveInterval int64 `json:"value1" binding:"required"`
TimeRecordTaskDuration int64 `json:"value2" binding:"required"`
}
func (h *Handler) UpdateLicenseConfig(c *gin.Context) {
c.JSON(http.StatusNotImplemented, gin.H{
"code": common.CodeServerError,
"message": "method not implemented",
})
return
}
// ShowLicense to get system license
func (h *Handler) ShowLicense(c *gin.Context) {
c.JSON(http.StatusNotImplemented, gin.H{
@@ -1092,10 +1119,11 @@ func (h *Handler) Reports(c *gin.Context) {
}
// Handle the heartbeat
if err := h.service.HandleHeartbeat(&req); err != nil {
errorResponse(c, "Failed to process heartbeat: "+err.Error(), 500)
errCode, message := h.service.HandleHeartbeat(&req)
if errCode != common.CodeLicenseValid {
responseWithCode(c, message, 500, errCode)
return
}
successNoData(c, "Heartbeat received successfully")
responseWithCode(c, message, int(http.StatusOK), errCode)
}

View File

@@ -116,6 +116,7 @@ func (r *Router) Setup(engine *gin.Engine) {
protected.GET("/fingerprint", r.handler.GetFingerprint)
// License
protected.POST("/license", r.handler.SetLicense)
protected.POST("/license/config", r.handler.UpdateLicenseConfig)
protected.GET("/license", r.handler.ShowLicense)
}
}

View File

@@ -48,6 +48,7 @@ var (
type Service struct {
userDAO *dao.UserDAO
licenseDAO *dao.LicenseDAO
timeRecordDAO *dao.TimeRecordDAO
systemSettingsDAO *dao.SystemSettingsDAO
}
@@ -56,6 +57,7 @@ func NewService() *Service {
return &Service{
userDAO: dao.NewUserDAO(),
licenseDAO: dao.NewLicenseDAO(),
timeRecordDAO: dao.NewTimeRecordDAO(),
systemSettingsDAO: dao.NewSystemSettingsDAO(),
}
}
@@ -1105,19 +1107,23 @@ func (s *Service) TestSandboxConnection(providerType string, config map[string]i
}, nil
}
var heartBeatCount int64 = 0
// HandleHeartbeat handle heartbeat
func (s *Service) HandleHeartbeat(msg *common.BaseMessage) error {
func (s *Service) HandleHeartbeat(message *common.BaseMessage) (common.ErrorCode, string) {
heartBeatCount++
status := &common.BaseMessage{
ServerName: msg.ServerName,
ServerType: msg.ServerType,
Host: msg.Host,
Port: msg.Port,
Version: msg.Version,
Timestamp: msg.Timestamp,
Ext: msg.Ext,
ServerName: message.ServerName,
ServerType: message.ServerType,
Host: message.Host,
Port: message.Port,
Version: message.Version,
Timestamp: message.Timestamp,
Ext: message.Ext,
}
GlobalServerStatusStore.UpdateStatus(msg.ServerName, status)
return nil
GlobalServerStatusStore.UpdateStatus(message.ServerName, status)
return common.CodeLicenseValid, ""
}
// InitDefaultAdmin initialize default admin user

View File

@@ -19,22 +19,64 @@ package common
type ErrorCode int
const (
CodeSuccess ErrorCode = 0
CodeNotEffective ErrorCode = 10
CodeExceptionError ErrorCode = 100
CodeArgumentError ErrorCode = 101
CodeDataError ErrorCode = 102
CodeOperatingError ErrorCode = 103
CodeTimeoutError ErrorCode = 104
CodeConnectionError ErrorCode = 105
CodeRunning ErrorCode = 106
CodeResourceExhausted ErrorCode = 107
CodePermissionError ErrorCode = 108
CodeAuthenticationError ErrorCode = 109
CodeBadRequest ErrorCode = 400
CodeUnauthorized ErrorCode = 401
CodeForbidden ErrorCode = 403
CodeNotFound ErrorCode = 404
CodeConflict ErrorCode = 409
CodeServerError ErrorCode = 500
CodeSuccess ErrorCode = 0
CodeNotEffective ErrorCode = 10
CodeExceptionError ErrorCode = 100
CodeArgumentError ErrorCode = 101
CodeDataError ErrorCode = 102
CodeOperatingError ErrorCode = 103
CodeTimeoutError ErrorCode = 104
CodeConnectionError ErrorCode = 105
CodeRunning ErrorCode = 106
CodeResourceExhausted ErrorCode = 107
CodePermissionError ErrorCode = 108
CodeAuthenticationError ErrorCode = 109
CodeLicenseValid ErrorCode = 320
CodeLicenseInactiveError ErrorCode = 321
CodeLicenseExpiredError ErrorCode = 322
CodeLicenseDigestError ErrorCode = 323
CodeLicenseTimeRollback ErrorCode = 324
CodeLicenseNotFound ErrorCode = 325
CodeLicenseUnexpectedError ErrorCode = 326
CodeBadRequest ErrorCode = 400
CodeUnauthorized ErrorCode = 401
CodeForbidden ErrorCode = 403
CodeNotFound ErrorCode = 404
CodeConflict ErrorCode = 409
CodeServerError ErrorCode = 500
)
var errorMessages = map[ErrorCode]string{
CodeSuccess: "Success",
CodeNotEffective: "Not effective",
CodeExceptionError: "System exception",
CodeArgumentError: "Invalid argument",
CodeDataError: "Data error",
CodeOperatingError: "Operation error",
CodeTimeoutError: "Timeout",
CodeConnectionError: "Connection error",
CodeRunning: "System running",
CodeResourceExhausted: "Resource exhausted",
CodePermissionError: "Permission denied",
CodeAuthenticationError: "Authentication failed",
CodeLicenseValid: "License valid",
CodeLicenseInactiveError: "License inactive",
CodeLicenseExpiredError: "License expired",
CodeLicenseDigestError: "License digest error",
CodeLicenseTimeRollback: "License time rollback detected",
CodeLicenseNotFound: "License not found",
CodeLicenseUnexpectedError: "Unexpected license error",
CodeBadRequest: "Bad request",
CodeUnauthorized: "Unauthorized",
CodeForbidden: "Forbidden",
CodeNotFound: "Resource not found",
CodeConflict: "Resource conflict",
CodeServerError: "Internal server error",
}
func (e ErrorCode) Message() string {
if msg, ok := errorMessages[e]; ok {
return msg
}
return "Unknown error"
}

View File

@@ -90,6 +90,7 @@ func InitDB() error {
NowFunc: func() time.Time {
return time.Now().Local()
},
TranslateError: true,
})
if err != nil {
return fmt.Errorf("failed to connect database: %w", err)

View File

@@ -64,3 +64,40 @@ func (dao *TimeRecordDAO) GetByID(id int64) (*model.TimeRecord, error) {
}
return &record, nil
}
// GetAll retrieves all records
func (dao *TimeRecordDAO) GetAll() ([]*model.TimeRecord, error) {
var records []*model.TimeRecord
err := DB.Find(&records).Error
return records, err
}
// KeepLatest keeps the latest N records and deletes older ones
func (dao *TimeRecordDAO) KeepLatest(count int64) error {
// Step 1: Get the maximum ID
var maxID int64
if err := DB.Model(&model.TimeRecord{}).Select("COALESCE(MAX(id), 0)").Scan(&maxID).Error; err != nil {
return err
}
// If no records or count is 0, nothing to delete
if maxID == 0 || count <= 0 {
return nil
}
// Step 2: Calculate the threshold ID
thresholdID := maxID - count
// If threshold is less than 0, keep all records
if thresholdID <= 0 {
return nil
}
// Step 3: Delete records with ID <= threshold
return DB.Where("id <= ?", thresholdID).Delete(&model.TimeRecord{}).Error
}
// DeleteAll deletes all records
func (dao *TimeRecordDAO) DeleteAll() error {
return DB.Where("1=1").Delete(&model.TimeRecord{}).Error
}

View File

@@ -74,7 +74,7 @@ func (h *AuthHandler) AuthMiddleware() gin.HandlerFunc {
if !local.IsAdminAvailable() {
license := local.GetAdminStatus()
errMsg := fmt.Sprintf("server license %s, check admin server status", license.Reason)
errMsg := fmt.Sprintf("server license %s", license.Reason)
logger.Warn(errMsg)
c.JSON(http.StatusServiceUnavailable, gin.H{
"code": common.CodeUnauthorized,

View File

@@ -18,6 +18,7 @@ package service
import (
"encoding/json"
"errors"
"fmt"
"ragflow/internal/common"
"ragflow/internal/server"
@@ -69,19 +70,19 @@ func (h *HeartbeatSender) InitHTTPClient() error {
h.logger.Info("Heartbeat HTTP client initialized",
zap.String("admin_host", adminConfig.Host),
zap.Int("admin_port", adminConfig.Port+2),
zap.Int("admin_port", adminConfig.Port),
)
return nil
}
// SendHeartbeat sends a heartbeat message to the admin server
func (h *HeartbeatSender) SendHeartbeat() (error, string) {
func (h *HeartbeatSender) SendHeartbeat() error {
if h.attemptCount < 10 {
if h.lastSuccess {
h.attemptCount++
return nil, ""
return nil
}
}
h.attemptCount = 0
@@ -90,7 +91,7 @@ func (h *HeartbeatSender) SendHeartbeat() (error, string) {
if h.client == nil {
if err := h.InitHTTPClient(); err != nil {
h.logger.Error("Failed to initialize HTTP client", zap.Error(err))
return err, "internal error, fail to initialize HTTP client"
return err
}
}
@@ -109,19 +110,26 @@ func (h *HeartbeatSender) SendHeartbeat() (error, string) {
jsonData, err := json.Marshal(message)
if err != nil {
h.logger.Error("Failed to marshal heartbeat message", zap.Error(err))
return err, "fail to parse the message"
return err
}
resp, err := h.client.PostJSON("/api/v1/admin/reports", jsonData)
if err != nil {
return err, "can't connect with admin server"
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
errMsg := fmt.Errorf("Heartbeat request failed with status code: %d", resp.StatusCode)
h.logger.Warn(errMsg.Error())
return errMsg, errMsg.Error()
// extract the Code and Message field of the response
var responseBody map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&responseBody)
if err != nil {
return err
}
responseCode := common.ErrorCode(responseBody["code"].(float64))
if responseCode != common.CodeLicenseValid {
return errors.New(responseCode.Message())
}
}
h.logger.Debug("Heartbeat sent successfully",
@@ -131,5 +139,5 @@ func (h *HeartbeatSender) SendHeartbeat() (error, string) {
h.lastSuccess = true
return nil, ""
return nil
}

View File

@@ -19,6 +19,7 @@ package utility
import (
"fmt"
"os"
"time"
)
// GetProjectBaseDirectory returns the current working directory.
@@ -78,3 +79,11 @@ func ParseInt64(s string) int64 {
fmt.Sscanf(s, "%d", &result)
return result
}
// FormatTime formats time for display
func FormatTime(t time.Time) string {
if t.IsZero() {
return "N/A (Perpetual)"
}
return t.Format("2006-01-02 15:04:05")
}