Files
ragflow/cmd/server_main.go
dripsmvcp 3d7adf2193 feat[Go]: implement GET /plugin/tools (issue #15240) (#15570)
## Summary

Port the Python `GET /v1/plugin/tools` endpoint to the Go API server.
Listed in the Go-API port checklist of #15240.

Returns the metadata of every embedded LLM tool plugin in the same JSON
shape the Python endpoint emits (camelCase keys preserved), so existing
frontends bind to the Go server without changes.
2026-06-08 11:53:19 +08:00

325 lines
11 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package main
import (
"context"
"errors"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"ragflow/internal/common"
"ragflow/internal/server"
"ragflow/internal/server/local"
"ragflow/internal/storage"
"ragflow/internal/utility"
"strings"
"syscall"
"time"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"ragflow/internal/cache"
"ragflow/internal/dao"
"ragflow/internal/engine"
"ragflow/internal/handler"
"ragflow/internal/router"
"ragflow/internal/service"
"ragflow/internal/service/nlp"
"ragflow/internal/tokenizer"
)
func printHelp() {
fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS]\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, "RAGFlow Server - Open-source RAG engine based on deep document understanding\n\n")
fmt.Fprintf(os.Stderr, "Options:\n")
fmt.Fprintf(os.Stderr, " -p, --port int\tServer port (overrides config file)\n")
fmt.Fprintf(os.Stderr, " -h, --help \tShow this help message and exit\n")
fmt.Fprintf(os.Stderr, "\nExamples:\n")
fmt.Fprintf(os.Stderr, " %s # Start server with config file port\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s -p 8080 # Start server on port 8080\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s --port 8080 # Start server on port 8080\n", os.Args[0])
}
func main() {
// Parse command line flags
var portFlag int
flag.IntVar(&portFlag, "port", 0, "Server port (overrides config file)")
flag.IntVar(&portFlag, "p", 0, "Server port (shorthand, overrides config file)")
// Custom help message
flag.Usage = printHelp
flag.Parse()
// Initialize logger with default level
// logger.Init("info"); // set debug log level
if err := common.Init("info", "server_main.log"); err != nil {
panic(fmt.Sprintf("Failed to initialize logger: %v", err))
}
// Initialize configuration
if err := server.Init(""); err != nil {
common.Fatal("Failed to initialize config", zap.Error(err))
}
// Override port with command line argument if provided
config := server.GetConfig()
if portFlag > 0 {
config.Server.Port = portFlag
common.Info("Port overridden by command line argument", zap.Int("port", portFlag))
}
if config.Server.Port == 0 {
common.Fatal("Server port is not configured. Please specify via --port flag or config file.")
}
// Reinitialize logger with configured level if different
level := config.Log.Level
if level == "" {
level = "info"
}
if err := common.Init(level, "server_main.log"); err != nil {
common.Error("Failed to reinitialize logger", err)
}
server.SetLogger(common.Logger)
if config.Log.Level == "" {
config.Log.Level = common.GetLevel()
}
common.Info("Server mode", zap.String("mode", config.Server.Mode))
// Print all configuration settings
server.PrintAll()
// Initialize database
if err := dao.InitDB(); err != nil {
common.Fatal("Failed to initialize database", zap.Error(err))
}
// Initialize doc engine
if err := engine.Init(&config.DocEngine); err != nil {
common.Fatal("Failed to initialize doc engine", zap.Error(err))
}
defer engine.Close()
// Initialize Redis cache
if err := cache.Init(&config.Redis); err != nil {
common.Fatal("Failed to initialize Redis", zap.Error(err))
}
defer cache.Close()
if err := storage.InitStorageFactory(); err != nil {
common.Fatal("Failed to initialize storage factory", zap.Error(err))
}
// Initialize server variables (runtime variables that can change during operation)
// This must be done after Cache is initialized
if err := server.InitVariables(cache.Get()); err != nil {
common.Warn("Failed to initialize server variables from Redis, using defaults", zap.String("error", err.Error()))
}
// Initialize admin status (default: unavailable=1)
local.InitAdminStatus(1, "admin server not connected")
// Initialize tokenizer (rag_analyzer)
dictPath := os.Getenv("RAGFLOW_DICT_PATH")
if dictPath == "" {
dictPath = "/usr/share/infinity/resource"
}
tokenizerCfg := &tokenizer.PoolConfig{
DictPath: dictPath,
}
if err := tokenizer.Init(tokenizerCfg); err != nil {
common.Fatal("Failed to initialize tokenizer", zap.Error(err))
}
defer tokenizer.Close()
// Initialize global QueryBuilder using tokenizer's DictPath
// This ensures the Synonym uses the same wordnet directory as tokenizer
if err := nlp.InitQueryBuilderFromTokenizer(tokenizerCfg.DictPath); err != nil {
common.Fatal("Failed to initialize query builder", zap.Error(err))
}
startServer(config)
common.Info("Server exited")
}
func startServer(config *server.Config) {
// Set Gin mode
if config.Server.Mode == "release" {
gin.SetMode(gin.ReleaseMode)
} else {
gin.SetMode(gin.DebugMode)
}
// Initialize service layer
userService := service.NewUserService()
documentService := service.NewDocumentService()
datasetsService := service.NewDatasetService()
knowledgebaseService := service.NewKnowledgebaseService()
metadataService := service.NewMetadataService()
chunkService := service.NewChunkService()
llmService := service.NewLLMService()
tenantService := service.NewTenantService()
chatService := service.NewChatService()
chatSessionService := service.NewChatSessionService()
systemService := service.NewSystemService()
connectorService := service.NewConnectorService()
searchService := service.NewSearchService()
fileService := service.NewFileService()
memoryService := service.NewMemoryService()
mcpService := service.NewMCPService()
modelProviderService := service.NewModelProviderService()
// Initialize doc engine for skill search
docEngine := engine.Get()
// Initialize handler layer
authHandler := handler.NewAuthHandler()
userHandler := handler.NewUserHandler(userService)
tenantHandler := handler.NewTenantHandler(tenantService, userService, knowledgebaseService)
documentHandler := handler.NewDocumentHandler(documentService, datasetsService)
datasetsHandler := handler.NewDatasetsHandler(datasetsService, metadataService)
systemHandler := handler.NewSystemHandler(systemService)
knowledgebaseHandler := handler.NewKnowledgebaseHandler(knowledgebaseService, userService, documentService)
chunkHandler := handler.NewChunkHandler(chunkService, userService)
llmHandler := handler.NewLLMHandler(llmService, userService)
chatHandler := handler.NewChatHandler(chatService, userService)
chatSessionHandler := handler.NewChatSessionHandler(chatSessionService, userService)
connectorHandler := handler.NewConnectorHandler(connectorService, userService)
searchHandler := handler.NewSearchHandler(searchService, userService)
fileHandler := handler.NewFileHandler(fileService, userService)
memoryHandler := handler.NewMemoryHandler(memoryService)
mcpHandler := handler.NewMCPHandler(mcpService)
skillSearchHandler := handler.NewSkillSearchHandler(docEngine)
providerHandler := handler.NewProviderHandler(userService, modelProviderService)
agentHandler := handler.NewAgentHandler(service.NewAgentService(), fileService)
relatedQuestionsHandler := handler.NewSearchbotHandler(
searchService,
tenantService,
&handler.SearchbotRealLLM{Svc: modelProviderService},
)
pluginHandler := handler.NewPluginHandler(service.NewPluginService())
// Dify retrieval handler
docDAO := dao.NewDocumentDAO()
retrievalService := nlp.NewRetrievalService(docEngine, docDAO)
difyRetrievalHandler := handler.NewDifyRetrievalHandler(
knowledgebaseService,
modelProviderService,
metadataService,
retrievalService,
docDAO,
docEngine,
)
// Initialize router
r := router.NewRouter(authHandler, userHandler, tenantHandler, documentHandler, datasetsHandler, systemHandler, knowledgebaseHandler, chunkHandler, llmHandler, chatHandler, chatSessionHandler, connectorHandler, searchHandler, fileHandler, memoryHandler, mcpHandler, skillSearchHandler, providerHandler, agentHandler, relatedQuestionsHandler, difyRetrievalHandler, pluginHandler)
// Create Gin engine
ginEngine := gin.New()
// Middleware
if config.Server.Mode == "debug" {
ginEngine.Use(gin.Logger())
}
ginEngine.Use(gin.Recovery())
// Setup routes
r.Setup(ginEngine)
// Create HTTP server with timeouts to prevent slow clients from blocking shutdown
addr := fmt.Sprintf(":%d", config.Server.Port)
srv := &http.Server{
Addr: addr,
Handler: ginEngine,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 60 * time.Second,
WriteTimeout: 120 * time.Second,
IdleTimeout: 120 * time.Second,
}
// Start server in a goroutine
go func() {
common.Info(
"\n ____ ___ ______ ______ __\n" +
" / __ \\ / | / ____// ____// /____ _ __\n" +
" / /_/ // /| | / / __ / /_ / // __ \\| | /| / /\n" +
" / _, _// ___ |/ /_/ // __/ / // /_/ /| |/ |/ /\n" +
" /_/ |_|/_/ |_|\\____//_/ /_/ \\____/ |__/|__/\n",
)
common.Info(fmt.Sprintf("RAGFlow Go Version: %s", utility.GetRAGFlowVersion()))
common.Info(fmt.Sprintf("Server starting on port: %d", config.Server.Port))
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
common.Fatal("Failed to start server", zap.Error(err))
}
}()
// Get local IP address for heartbeat reporting
localIP, err := utility.GetLocalIP()
if err != nil {
common.Fatal("fail to get local ip address")
}
// Initialize and start heartbeat reporter to admin server
heartbeatService := service.NewHeartbeatSender(
common.Logger,
common.ServerTypeAPI,
fmt.Sprintf("ragflow-server-%d", config.Server.Port),
localIP,
config.Server.Port,
)
if err = heartbeatService.InitHTTPClient(); err != nil {
common.Warn("Failed to initialize heartbeat service", zap.Error(err))
} else {
// Start heartbeat reporter with 30 seconds interval
heartbeatReporter := utility.NewScheduledTask("Heartbeat reporter", 3*time.Second, func() {
if err = heartbeatService.SendHeartbeat(); err == nil {
local.SetAdminStatus(0, "")
} else {
local.SetAdminStatus(1, err.Error())
//logger.Warn(fmt.Sprintf(err.Error()))
}
})
heartbeatReporter.Start()
defer heartbeatReporter.Stop()
}
// Wait for interrupt signal to gracefully shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2)
sig := <-quit
common.Info(fmt.Sprintf("Receives %s signal to shutdown server", strings.ToUpper(sig.String())))
common.Info("Shutting down server...")
// Create context with timeout for graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Shutdown server
if err = srv.Shutdown(ctx); err != nil {
common.Fatal("Server forced to shutdown", zap.Error(err))
}
}