From 7d64a78f830f9a0eb0dfc0397dfc7dadff7aa0ae Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Thu, 2 Jul 2026 21:21:10 +0800 Subject: [PATCH] Go: unify three services into one binary (#16462) ### Summary Plan to start api_server, admin_server and ingestor in one binary: - ./ragflow_main --admin - ./ragflow_main --api - ./ragflow_main --ingestor --------- Signed-off-by: Jin Hai --- build.sh | 60 +-- cmd/admin_server.go | 224 ---------- cmd/ingestor.go | 259 ----------- cmd/ragflow-cli.go | 14 +- cmd/{server_main.go => ragflow_main.go} | 563 ++++++++++++++++++++---- docker/.env | 1 + docker/entrypoint.sh | 103 ++--- docker/service_conf.yaml.template | 2 + internal/development.md | 35 +- internal/engine/elasticsearch/client.go | 29 +- internal/engine/global.go | 5 +- internal/engine/infinity/chunk.go | 34 +- internal/engine/infinity/metadata.go | 42 +- internal/engine/infinity/sql.go | 16 +- internal/engine/infinity/sql_test.go | 18 +- internal/engine/nats/nats.go | 18 +- internal/server/config.go | 4 +- internal/server/local_variables.go | 53 +++ internal/utility/path.go | 51 +++ 19 files changed, 761 insertions(+), 770 deletions(-) delete mode 100644 cmd/admin_server.go delete mode 100644 cmd/ingestor.go rename cmd/{server_main.go => ragflow_main.go} (52%) create mode 100644 internal/server/local_variables.go diff --git a/build.sh b/build.sh index b49b836653..42082b8a7d 100755 --- a/build.sh +++ b/build.sh @@ -14,9 +14,7 @@ PROJECT_ROOT="$SCRIPT_DIR" # Build directories CPP_DIR="$PROJECT_ROOT/internal/cpp" BUILD_DIR="$CPP_DIR/cmake-build-release" -RAGFLOW_SERVER_BINARY="$PROJECT_ROOT/bin/ragflow_server" -ADMIN_SERVER_BINARY="$PROJECT_ROOT/bin/admin_server" -INGESTOR_BINARY="$PROJECT_ROOT/bin/ingestor" +RAGFLOW_MAIN_BINARY="$PROJECT_ROOT/bin/ragflow_main" RAGFLOW_CLI_BINARY="$PROJECT_ROOT/bin/ragflow-cli" # Strip symbols from Go binaries (set via --strip / -s) @@ -46,7 +44,10 @@ _seed_from_system() { local dep_dir="${HOME}/ragflow-native-libs/${dep_name}" local sys_dir="${SYSTEM_DEPS}/${dep_name}" + echo "check if dep ${dep_name} exists in ${dep_dir} or ${sys_dir}" + if [ -d "$dep_dir" ]; then + echo " ${dep_name} → ${dep_dir} (user cache)" return 0 # already cached fi if [ -d "$sys_dir" ]; then @@ -55,6 +56,7 @@ _seed_from_system() { cp -r "$sys_dir" "$dep_dir" return 0 fi + echo " ${dep_name} not found in system or user cache" return 1 } @@ -299,39 +301,21 @@ build_go() { local strip_flags=() [ -n "$STRIP_SYMBOLS" ] && strip_flags=(-ldflags="-s -w") - echo "Building RAGFlow binary: $RAGFLOW_SERVER_BINARY, $ADMIN_SERVER_BINARY, $INGESTOR_BINARY, and $RAGFLOW_CLI_BINARY" + echo "Building RAGFlow binary: $RAGFLOW_MAIN_BINARY, and $RAGFLOW_CLI_BINARY" GOPROXY=${GOPROXY:-https://goproxy.cn,https://proxy.golang.org,direct} CGO_ENABLED=1 \ CGO_CFLAGS="$CGO_CFLAGS" CGO_LDFLAGS="$CGO_LDFLAGS" \ - go build "${strip_flags[@]}" -o "$RAGFLOW_SERVER_BINARY" cmd/server_main.go - GOPROXY=${GOPROXY:-https://goproxy.cn,https://proxy.golang.org,direct} CGO_ENABLED=1 \ - CGO_CFLAGS="$CGO_CFLAGS" CGO_LDFLAGS="$CGO_LDFLAGS" \ - go build "${strip_flags[@]}" -o "$ADMIN_SERVER_BINARY" cmd/admin_server.go - GOPROXY=${GOPROXY:-https://goproxy.cn,https://proxy.golang.org,direct} CGO_ENABLED=1 \ - CGO_CFLAGS="$CGO_CFLAGS" CGO_LDFLAGS="$CGO_LDFLAGS" \ - go build "${strip_flags[@]}" -o "$INGESTOR_BINARY" cmd/ingestor.go + go build "${strip_flags[@]}" -o "$RAGFLOW_MAIN_BINARY" cmd/ragflow_main.go GOPROXY=${GOPROXY:-https://goproxy.cn,https://proxy.golang.org,direct} CGO_ENABLED=1 \ CGO_CFLAGS="$CGO_CFLAGS" CGO_LDFLAGS="$CGO_LDFLAGS" \ go build "${strip_flags[@]}" -o "$RAGFLOW_CLI_BINARY" cmd/ragflow-cli.go - if [ ! -f "$RAGFLOW_SERVER_BINARY" ]; then - echo -e "${RED}Error: Failed to build RAGFlow server binary${NC}" + if [ ! -f "$RAGFLOW_MAIN_BINARY" ]; then + echo -e "${RED}Error: Failed to build RAGFlow main binary${NC}" exit 1 fi - if [ ! -f "$ADMIN_SERVER_BINARY" ]; then - echo -e "${RED}Error: Failed to build Admin server binary${NC}" - exit 1 - fi - - if [ ! -f "$INGESTOR_BINARY" ]; then - echo -e "${RED}Error: Failed to build Ingestor binary${NC}" - exit 1 - fi - - echo -e "${GREEN}✓ Go ragflow_server built successfully: $RAGFLOW_SERVER_BINARY${NC}" - echo -e "${GREEN}✓ Go admin_server built successfully: $ADMIN_SERVER_BINARY${NC}" + echo -e "${GREEN}✓ Go ragflow_main built successfully: $RAGFLOW_MAIN_BINARY${NC}" echo -e "${GREEN}✓ Go ragflow-cli built successfully: $RAGFLOW_CLI_BINARY${NC}" - echo -e "${GREEN}✓ Go ingestor built successfully: $INGESTOR_BINARY${NC}" } # Configure CGO flags for native libraries (office_oxide, pdfium, pdf_oxide). @@ -421,9 +405,7 @@ clean() { print_section "Cleaning build artifacts" rm -rf "$BUILD_DIR" - rm -f "$RAGFLOW_SERVER_BINARY" - rm -f "$ADMIN_SERVER_BINARY" - rm -f "$INGESTOR_BINARY" + rm -f "$RAGFLOW_MAIN_BINARY" rm -f "$RAGFLOW_CLI_BINARY" echo -e "${GREEN}✓ Build artifacts cleaned${NC}" @@ -431,16 +413,8 @@ clean() { # Run the server run() { - if [ ! -f "$ADMIN_SERVER_BINARY" ]; then - echo -e "${RED}Error: $ADMIN_SERVER_BINARY not found. Build first with --all or --go${NC}" - exit 1 - fi - if [ ! -f "$RAGFLOW_SERVER_BINARY" ]; then - echo -e "${RED}Error: $RAGFLOW_SERVER_BINARY not found. Build first with --all or --go${NC}" - exit 1 - fi - if [ ! -f "$INGESTOR_BINARY" ]; then - echo -e "${RED}Error: $INGESTOR_BINARY not found. Build first with --all or --go${NC}" + if [ ! -f "$RAGFLOW_MAIN_BINARY" ]; then + echo -e "${RED}Error: $RAGFLOW_MAIN_BINARY not found. Build first with --all or --go${NC}" exit 1 fi @@ -449,7 +423,7 @@ run() { # admin_server must be running before ragflow_server, otherwise ragflow_server's # heartbeats to admin will error out (see internal/development.md). print_section "Starting admin server (background)" - "$ADMIN_SERVER_BINARY" & + "$RAGFLOW_MAIN_BINARY" --admin & ADMIN_PID=$! trap 'kill "$ADMIN_PID" 2>/dev/null || true' EXIT INT TERM @@ -458,13 +432,13 @@ run() { sleep 1 print_section "Starting ingestor (background)" - "$INGESTOR_BINARY" & + "$RAGFLOW_MAIN_BINARY" --ingestor & INGESTOR_PID=$! trap 'kill "$INGESTOR_PID" 2>/dev/null || true' EXIT INT TERM sleep 1 print_section "Starting RAGFlow server (foreground)" - "$RAGFLOW_SERVER_BINARY" + "$RAGFLOW_MAIN_BINARY" -- api } # Show help @@ -566,7 +540,7 @@ main() { build_cpp build_go echo -e "\n${GREEN}=== Build completed successfully! ===${NC}" - echo "Binary: $RAGFLOW_SERVER_BINARY, $ADMIN_SERVER_BINARY, $INGESTOR_BINARY, $RAGFLOW_CLI_BINARY" + echo "Binary: $RAGFLOW_MAIN_BINARY, $RAGFLOW_CLI_BINARY" ;; *) echo -e "${RED}Unknown option: $1${NC}" diff --git a/cmd/admin_server.go b/cmd/admin_server.go deleted file mode 100644 index 487b5d1a7b..0000000000 --- a/cmd/admin_server.go +++ /dev/null @@ -1,224 +0,0 @@ -//go:build ignore - -// -// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package main - -import ( - "context" - "errors" - "flag" - "fmt" - "net/http" - "os" - "os/signal" - "ragflow/internal/common" - "ragflow/internal/engine" - "ragflow/internal/engine/redis" - "ragflow/internal/utility" - "syscall" - "time" - - "github.com/gin-gonic/gin" - "go.uber.org/zap" - - "ragflow/internal/admin" - "ragflow/internal/dao" - "ragflow/internal/server" -) - -func printHelp() { - fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS]\n\n", os.Args[0]) - fmt.Fprintf(os.Stderr, "RAGFlow Admin Server\n\n") - fmt.Fprintf(os.Stderr, "Options:\n") - fmt.Fprintf(os.Stderr, " --config string\tPath to configuration file\n") - fmt.Fprintf(os.Stderr, " -v, --version \tPrint version information and exit\n") - fmt.Fprintf(os.Stderr, " --debug \tEnable debug-level logging\n") - fmt.Fprintf(os.Stderr, " --init-superuser\tInitialize superuser account\n") - fmt.Fprintf(os.Stderr, " -h, --help \tShow this help message and exit\n") -} - -func main() { - var configPath string - flag.StringVar(&configPath, "config", "", "Path to configuration file") - var debugFlag bool - flag.BoolVar(&debugFlag, "debug", false, "Enable debug-level logging") - var versionFlag bool - flag.BoolVar(&versionFlag, "version", false, "Print version information and exit") - var initSuperuser bool - flag.BoolVar(&initSuperuser, "init-superuser", false, "Initialize superuser account") - - // Custom help message - flag.Usage = printHelp - - flag.Parse() - - // Handle --version flag: print version and exit immediately - if versionFlag { - fmt.Printf("RAGFlow version: %s\n", utility.GetRAGFlowVersion()) - return - } - - // Initialize logger - if err := common.Init("info", common.FileOutput{Path: "admin_server.log"}); err != nil { - panic("failed to initialize logger: " + err.Error()) - } - - // Initialize configuration - if err := server.Init(configPath); err != nil { - common.Error("Failed to initialize configuration", err) - os.Exit(1) - } - - cfg := server.GetConfig() - - // Reinitialize logger with configured level if different - logLevel := cfg.Log.Level - if logLevel == "" { - logLevel = "info" - } - - if debugFlag { - logLevel = "debug" - } - - fileOut := common.FileOutput{ - Path: "admin_server.log", - MaxSize: cfg.Log.MaxSize, - MaxBackups: cfg.Log.MaxBackups, - MaxAge: cfg.Log.MaxAge, - Compress: common.ResolveCompress(cfg.Log.Compress), - } - if cfg.Log.Path != "" { - fileOut.Path = cfg.Log.Path - } - if err := common.Init(logLevel, fileOut); err != nil { - common.Error("Failed to reinitialize logger with configured level", err) - } - - // Set logger for server package - server.SetLogger(common.Logger) - - common.Info("Server mode", zap.String("mode", cfg.Server.Mode)) - - // Set Gin mode - if cfg.Server.Mode == "release" { - gin.SetMode(gin.ReleaseMode) - } else { - gin.SetMode(gin.DebugMode) - } - - // Initialize database - if err := dao.InitDB(); err != nil { - common.Error("Failed to initialize database", err) - os.Exit(1) - } - - // Initialize doc engine - if err := engine.Init(&cfg.DocEngine); err != nil { - common.Fatal("Failed to initialize doc engine", zap.Error(err)) - } - defer engine.Close() - - // Initialize Redis cache - if err := redis.Init(&cfg.Redis); err != nil { - common.Fatal("Failed to initialize Redis", zap.Error(err)) - } - defer redis.Close() - - if err := engine.InitMessageQueueEngine(cfg.TaskExecutor.MessageQueueType); err != nil { - common.Error("Failed to initialize message queue engine", err) - } - - // Initialize server variables (runtime variables that can change during operation) - // This must be done after Cache is initialized - if err := server.InitVariables(redis.Get()); err != nil { - common.Warn("Failed to initialize server variables from Redis, using defaults", zap.String("error", err.Error())) - } - - adminService := admin.NewService() - adminHandler := admin.NewHandler(adminService) - - if initSuperuser { - // Initialize default admin user - if err := adminService.InitDefaultAdmin(); err != nil { - common.Error("Failed to initialize default admin user", err) - } - - } - - // Initialize router - r := admin.NewRouter(adminHandler) - - // Create Gin engine - ginEngine := gin.New() - - // Middleware - ginEngine.Use(common.GinLogger()) - ginEngine.Use(gin.Recovery()) - - // Setup routes - r.Setup(ginEngine) - - // Create HTTP server - addr := fmt.Sprintf(":%d", cfg.Admin.Port) - srv := &http.Server{ - Addr: addr, - Handler: ginEngine, - } - - // Print all configuration settings - server.PrintAll() - - // Print RAGFlow Admin logo - common.Info("" + - "\n ____ ___ ______________ ___ __ _ \n" + - " / __ \\/ | / ____/ ____/ /___ _ __ / | ____/ /___ ___ (_)___ \n" + - " / /_/ / /| |/ / __/ /_ / / __ \\ | /| / / / /| |/ __ / __ `__ \\/ / __ \\ \n" + - " / _, _/ ___ / /_/ / __/ / / /_/ / |/ |/ / / ___ / /_/ / / / / / / / / / /\n" + - " /_/ |_/_/ |_\\____/_/ /_/\\____/|__/|__/ /_/ |_\\__,_/_/ /_/ /_/_/_/ /_/ \n") - - // Print RAGFlow version - common.Info(fmt.Sprintf("RAGFlow admin version: %s", utility.GetRAGFlowVersion())) - - // Start HTTP server in a goroutine - go func() { - common.Info(fmt.Sprintf("Starting RAGFlow admin HTTP server on port: %d", cfg.Admin.Port)) - if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - common.Fatal("Failed to start server", zap.Error(err)) - } - }() - - // Wait for interrupt signal to gracefully shutdown - quit := make(chan os.Signal, 1) - signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2) - sig := <-quit - - common.Info("Received signal", zap.String("signal", sig.String())) - common.Info("Shutting down RAGFlow HTTP server...") - - // Create context with timeout for graceful shutdown - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // Shutdown HTTP server - if err := srv.Shutdown(ctx); err != nil { - common.Fatal("Server forced to shutdown", zap.Error(err)) - } - - common.Info("Admin HTTP server exited") -} diff --git a/cmd/ingestor.go b/cmd/ingestor.go deleted file mode 100644 index 200ddfd76d..0000000000 --- a/cmd/ingestor.go +++ /dev/null @@ -1,259 +0,0 @@ -//go:build ignore - -// -// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package main - -import ( - "context" - "flag" - "fmt" - "os" - "os/signal" - "ragflow/internal/engine/redis" - "ragflow/internal/ingestion" - "ragflow/internal/server/local" - "ragflow/internal/service" - "ragflow/internal/service/nlp" - "ragflow/internal/tokenizer" - "ragflow/internal/utility" - "syscall" - "time" - - "ragflow/internal/common" - "ragflow/internal/dao" - "ragflow/internal/engine" - "ragflow/internal/server" - "ragflow/internal/storage" - - "go.uber.org/zap" -) - -func printIngestionServerHelp() { - fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS]\n\n", os.Args[0]) - fmt.Fprintf(os.Stderr, "RAGFlow Ingestion Worker - Document ingestion processing\n\n") - fmt.Fprintf(os.Stderr, "Options:\n") - fmt.Fprintf(os.Stderr, " -f string\t\tPath to config file (default: auto-detect)\n") - fmt.Fprintf(os.Stderr, " --name string\t\tIngestion server name (default: \"default_ingestion\")\n") - fmt.Fprintf(os.Stderr, " --admin-host string\tAdmin server host (overrides config file)\n") - fmt.Fprintf(os.Stderr, " --admin-port int\tAdmin server port (overrides config file)\n") - fmt.Fprintf(os.Stderr, " --version \tPrint version information and exit\n") - fmt.Fprintf(os.Stderr, " --debug \tEnable debug-level logging\n") - fmt.Fprintf(os.Stderr, " -h, --help\t\tShow this help message and exit\n") - fmt.Fprintf(os.Stderr, "\nExamples:\n") - fmt.Fprintf(os.Stderr, " %s # Start with default config\n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s -f /path/to/config.yaml # Start with custom config file\n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s --admin-host 10.0.0.1 --admin-port 9383\n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s --version \t\t# Show version and exit\n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s --debug \t\t# Start with debug logging\n", os.Args[0]) -} - -func main() { - // Parse command line flags - var configPath string - var name string - var adminHost string - var adminPort int - - flag.StringVar(&configPath, "f", "", "Path to config file (overrides auto-detect)") - flag.StringVar(&name, "name", "default_ingestion", "Ingestion server name") - flag.StringVar(&adminHost, "admin-host", "", "Admin server host (overrides config file)") - flag.IntVar(&adminPort, "admin-port", 0, "Admin server port (overrides config file)") - var debugFlag bool - flag.BoolVar(&debugFlag, "debug", false, "Enable debug-level logging") - var versionFlag bool - flag.BoolVar(&versionFlag, "version", false, "Print version information and exit") - - // Custom help message - flag.Usage = printIngestionServerHelp - - flag.Parse() - - // Handle --version flag: print version and exit immediately - if versionFlag { - fmt.Printf("RAGFlow version: %s\n", utility.GetRAGFlowVersion()) - return - } - - // Initialize logger with default level - if err := common.Init("info", common.FileOutput{Path: "ingestion_server.log"}); err != nil { - panic(fmt.Sprintf("Failed to initialize logger: %v", err)) - } - - // Initialize configuration - if err := server.Init(configPath); err != nil { - common.Fatal("Failed to initialize config", zap.Error(err)) - } - - config := server.GetConfig() - - // Override admin server host with command line argument if provided - if adminHost != "" { - config.Admin.Host = adminHost - common.Info("Admin host overridden by command line argument", zap.String("admin_host", adminHost)) - } - - // Override admin server port with command line argument if provided - if adminPort > 0 { - config.Admin.Port = adminPort - common.Info("Admin port overridden by command line argument", zap.Int("admin_port", adminPort)) - } - - // Reinitialize logger with configured level if different - level := config.Log.Level - if level == "" { - level = "info" - } - - if debugFlag { - level = "debug" - } - - fileOut := common.FileOutput{ - Path: "ingestion_server.log", - MaxSize: config.Log.MaxSize, - MaxBackups: config.Log.MaxBackups, - MaxAge: config.Log.MaxAge, - Compress: common.ResolveCompress(config.Log.Compress), - } - if config.Log.Path != "" { - fileOut.Path = config.Log.Path - } - if err := common.Init(level, fileOut); err != nil { - common.Error("Failed to reinitialize logger", err) - } - server.SetLogger(common.Logger) - - common.Info("Starting RAGFlow Ingestion Worker") - - // Initialize database - if err := dao.InitDB(); err != nil { - common.Fatal("Failed to initialize database", zap.Error(err)) - } - - // Initialize doc engine - if err := engine.Init(&config.DocEngine); err != nil { - common.Fatal("Failed to initialize doc engine", zap.Error(err)) - } - defer engine.Close() - - // Initialize Redis cache - if err := redis.Init(&config.Redis); err != nil { - common.Fatal("Failed to initialize Redis", zap.Error(err)) - } - defer redis.Close() - - // Initialize storage factory - if err := storage.InitStorageFactory(); err != nil { - common.Fatal("Failed to initialize storage factory", zap.Error(err)) - } - - if err := engine.InitMessageQueueEngine(config.TaskExecutor.MessageQueueType); err != nil { - common.Fatal(fmt.Sprintf("Failed to initialize message queue engine: %w", err)) - } - - // Initialize server variables (runtime variables from Redis) - if err := server.InitVariables(redis.Get()); err != nil { - common.Warn("Failed to initialize server variables from Redis, using defaults", zap.String("error", err.Error())) - } - - // Initialize tokenizer (rag_analyzer) - tokenizerCfg := &tokenizer.PoolConfig{ - DictPath: "/usr/share/infinity/resource", - } - if err := tokenizer.Init(tokenizerCfg); err != nil { - common.Fatal("Failed to initialize tokenizer", zap.Error(err)) - } - defer tokenizer.Close() - - // Initialize global QueryBuilder using tokenizer's DictPath - if err := nlp.InitQueryBuilderFromTokenizer(tokenizerCfg.DictPath); err != nil { - common.Fatal("Failed to initialize query builder", zap.Error(err)) - } - - ingestor := ingestion.NewIngestor(name, 2, []string{"pdf", "docx", "txt"}) - - go func() { - err := ingestor.Start() - if err != nil { - common.Error("Failed to initialize ingestor", err) - return - } - }() - - quit := make(chan os.Signal, 1) - signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2) - - // Print all configuration settings - server.PrintAll() - common.Info("\n ____ __ _\n" + - " / _/___ ____ ____ _____/ /_(_)___ ____ ________ ______ _____ _____\n" + - " / // __ \\/ __ `/ _ \\/ ___/ __/ / __ \\/ __ \\ / ___/ _ \\/ ___/ | / / _ \\/ ___/\n" + - " _/ // / / / /_/ / __(__ ) /_/ / /_/ / / / / (__ ) __/ / | |/ / __/ /\n" + - "/___/_/ /_/\\__, /\\___/____/\\__/_/\\____/_/ /_/ /____/\\___/_/ |___/\\___/_/\n" + - " /____/\n") - - // Print RAGFlow version - common.Info(fmt.Sprintf("RAGFlow ingestion service version: %s", utility.GetRAGFlowVersion())) - - // Get local IP address for heartbeat reporting - localIP, err := utility.GetLocalIP() - if err != nil { - common.Fatal("fail to get local ip address") - } - - // Initialize and start heartbeat reporter to admin server - service.AdminServiceClient = service.NewAdminClient( - common.Logger, - common.ServerTypeIngestion, - fmt.Sprintf("ingestor-%s", ingestor.ID()), - localIP, - -1, - ) - if err = service.AdminServiceClient.InitHTTPClient(); err != nil { - common.Warn("Failed to initialize heartbeat service", zap.Error(err)) - } else { - // Start heartbeat reporter with 30 seconds interval - heartbeatReporter := utility.NewScheduledTask("Heartbeat reporter", 3*time.Second, func() { - if err = service.AdminServiceClient.SendHeartbeat(); err == nil { - local.SetAdminStatus(0, "") - } else { - local.SetAdminStatus(1, err.Error()) - //logger.Warn(fmt.Sprintf(err.Error())) - } - }) - heartbeatReporter.Start() - defer heartbeatReporter.Stop() - } - - // Wait for either an OS signal or a shutdown command from the admin - select { - case sig := <-quit: - common.Info("Received signal", zap.String("signal", sig.String())) - common.Info(fmt.Sprintf("Shutting down RAGFlow ingestor %s ...", name)) - case <-ingestor.ShutdownCh: - common.Info(fmt.Sprintf("Received shutdown command from admin, stopping ingestor %s ...", name)) - } - - // Create context with timeout for graceful shutdown - _, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - ingestor.Stop() - - common.Info(fmt.Sprintf("Ingestor %s shutdown complete", name)) -} diff --git a/cmd/ragflow-cli.go b/cmd/ragflow-cli.go index be5c3dfa8d..0b088a2c08 100644 --- a/cmd/ragflow-cli.go +++ b/cmd/ragflow-cli.go @@ -30,19 +30,19 @@ import ( func main() { - parseArgs, err := cli.ParseArgs(os.Args[1:]) + arguments, err := cli.ParseArgs(os.Args[1:]) if err != nil { return } - if parseArgs.ShowHelp { + if arguments.ShowHelp { cli.PrintUsage() return } - //parseArgs.Print() + //arguments.Print() logLevel := "warn" // Default to warn (quiet mode) - if parseArgs.Verbose { + if arguments.Verbose { logLevel = "info" } @@ -50,7 +50,7 @@ func main() { fmt.Printf("Warning: Failed to initialize logger: %v\n", err) } - client, err := cli.NewCLIWithConfig(parseArgs) + client, err := cli.NewCLIWithConfig(arguments) if err != nil { fmt.Printf("Failed to create CLI: %v\n", err) os.Exit(1) @@ -64,8 +64,8 @@ func main() { os.Exit(0) }() - if parseArgs.Command != nil { - if err = client.RunSingleCommand(parseArgs.Command); err != nil { + if arguments.Command != nil { + if err = client.RunSingleCommand(arguments.Command); err != nil { fmt.Printf("Command execution failed: %v\n", err) os.Exit(1) } diff --git a/cmd/server_main.go b/cmd/ragflow_main.go similarity index 52% rename from cmd/server_main.go rename to cmd/ragflow_main.go index c585af7f22..bcdf276194 100644 --- a/cmd/server_main.go +++ b/cmd/ragflow_main.go @@ -1,5 +1,3 @@ -//go:build ignore - // // Copyright 2026 The InfiniFlow Authors. All Rights Reserved. // @@ -21,17 +19,26 @@ package main import ( "context" "errors" - "flag" "fmt" "net/http" "os" "os/signal" - "ragflow/internal/common" - "ragflow/internal/engine/redis" - "ragflow/internal/server" + "ragflow/internal/admin" + "ragflow/internal/agent/audio" + "ragflow/internal/agent/canvas" + "ragflow/internal/agent/runtime" + agenttool "ragflow/internal/agent/tool" + "ragflow/internal/handler" + "ragflow/internal/ingestion" + "ragflow/internal/mcp" + "ragflow/internal/router" "ragflow/internal/server/local" + "ragflow/internal/service" + "ragflow/internal/service/chunk" + "ragflow/internal/service/nlp" "ragflow/internal/storage" - "ragflow/internal/utility" + "ragflow/internal/tokenizer" + "strconv" "strings" "syscall" "time" @@ -39,92 +46,256 @@ import ( "github.com/gin-gonic/gin" "go.uber.org/zap" - "ragflow/internal/agent/audio" - "ragflow/internal/agent/canvas" - _ "ragflow/internal/agent/component" // blank import: registers every Component factory (Begin / Agent / LLM / Message / Retrieval / ...) into the shared runtime at package init - "ragflow/internal/agent/runtime" - agenttool "ragflow/internal/agent/tool" + _ "ragflow/internal/agent/component" + "ragflow/internal/common" "ragflow/internal/dao" "ragflow/internal/engine" - "ragflow/internal/handler" - "ragflow/internal/mcp" - "ragflow/internal/router" - "ragflow/internal/service" - "ragflow/internal/service/chunk" - "ragflow/internal/service/nlp" - "ragflow/internal/tokenizer" + "ragflow/internal/engine/redis" + "ragflow/internal/server" + "ragflow/internal/utility" ) -func printHelp() { - fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS]\n\n", os.Args[0]) - fmt.Fprintf(os.Stderr, "RAGFlow Server - Open-source RAG engine based on deep document understanding\n\n") - fmt.Fprintf(os.Stderr, "Options:\n") - fmt.Fprintf(os.Stderr, " -p, --port int\t\tServer port (overrides config file)\n") - fmt.Fprintf(os.Stderr, " -v, --version \tPrint version information and exit\n") - fmt.Fprintf(os.Stderr, " --debug \tEnable debug-level logging\n") - fmt.Fprintf(os.Stderr, " -h, --help \tShow this help message and exit\n") - fmt.Fprintf(os.Stderr, "\nExamples:\n") - fmt.Fprintf(os.Stderr, " %s \t\t# Start server with config file port\n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s -p 8080 \t\t# Start server on port 8080\n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s --port 8080 \t# Start server on port 8080\n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s --version \t# Show version and exit\n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s --debug \t# Start server with debug logging\n", os.Args[0]) +type serverArgs struct { + mode *string // admin | api | ingestor + helpFlag bool + versionFlag bool + debugLog bool + configPath *string // Used by admin, api; user defined config path + initSuperUser bool // Used by admin; + port *int // Used by admin, api + adminHost *string // Used by api and ingestor for heartbeat + adminPort *int // Used by api and ingestor for heartbeat, "ip:port" + name *string // server name +} + +func parseArgs() (*serverArgs, error) { + args := &serverArgs{} + + var serverMode string + var configPath string + for i := 1; i < len(os.Args); i++ { + arg := os.Args[i] + switch arg { + case "--admin": + serverMode = "admin" + args.mode = &serverMode + case "--ingestor": + serverMode = "ingestor" + args.mode = &serverMode + case "--api": + serverMode = "api" + args.mode = &serverMode + case "-h", "--help": + args.helpFlag = true + case "-v", "--version": + args.versionFlag = true + case "--debug": + args.debugLog = true + case "-f", "--config": + if i+1 >= len(os.Args) { + return nil, fmt.Errorf("%s requires a value", arg) + } + i++ + configPath = os.Args[i] + args.configPath = &configPath + case "--init-superuser": + args.initSuperUser = true + case "-p", "--port": + if i+1 >= len(os.Args) { + return nil, errors.New("--port requires a value") + } + i++ + port, convErr := strconv.Atoi(os.Args[i]) + if convErr != nil { + return nil, fmt.Errorf("invalid port: %w", convErr) + } + args.port = &port + if port <= 0 || port > 65535 { + return nil, fmt.Errorf("invalid port: %d", port) + } + case "--admin-host": + if i+1 >= len(os.Args) { + return nil, errors.New("--admin-host requires a value") + } + i++ + parts := strings.SplitN(os.Args[i], ":", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return nil, errors.New("--admin-host must be in the form 'ip:port'") + } + ip, portStr := parts[0], parts[1] + port, convErr := strconv.Atoi(portStr) + if convErr != nil { + return nil, fmt.Errorf("failed to parse admin port: %w", convErr) + } + args.adminHost = &ip + args.adminPort = &port + case "--name": + if i+1 >= len(os.Args) { + return nil, errors.New("--name requires a value") + } + i++ + args.name = &os.Args[i] + default: + return nil, fmt.Errorf("unknown parameter: %s", arg) + } + } + return args, nil +} + +func printHelp(args *serverArgs) { + switch { + case args.mode == nil: + fmt.Fprintf(os.Stderr, "Usage: %s --api|--admin|--ingestor [OPTIONS]\n\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "RAGFlow Server - Open-source RAG engine based on deep document understanding\n\n") + fmt.Fprintf(os.Stderr, "Mode selection (default: --api):\n") + fmt.Fprintf(os.Stderr, " --api \tRun as API server\n") + fmt.Fprintf(os.Stderr, " --admin \tRun as admin server\n") + fmt.Fprintf(os.Stderr, " --ingestor \tRun as ingestion worker\n\n") + fmt.Fprintf(os.Stderr, "Common options:\n") + fmt.Fprintf(os.Stderr, " --config string\tPath to configuration file\n") + fmt.Fprintf(os.Stderr, " -v, --version \tPrint version information and exit\n") + fmt.Fprintf(os.Stderr, " --debug \tEnable debug-level logging\n") + fmt.Fprintf(os.Stderr, " -h, --help \tShow this help message and exit\n\n") + fmt.Fprintf(os.Stderr, "Run '%s --api --help' for API server options.\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Run '%s --admin --help' for admin server options.\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Run '%s --ingestor --help' for ingester options.\n", os.Args[0]) + case *args.mode == "api": + fmt.Fprintf(os.Stderr, "Usage: %s --api [OPTIONS]\n\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "RAGFlow API Server\n\n") + fmt.Fprintf(os.Stderr, "Options:\n") + fmt.Fprintf(os.Stderr, " --port int \tServer port (overrides config file)\n") + fmt.Fprintf(os.Stderr, " -f --config string\tPath to configuration file\n") + fmt.Fprintf(os.Stderr, " -v, --version \tPrint version information and exit\n") + fmt.Fprintf(os.Stderr, " --debug \tEnable debug-level logging\n") + fmt.Fprintf(os.Stderr, " -h, --help \tShow this help message and exit\n") + case *args.mode == "admin": + fmt.Fprintf(os.Stderr, "Usage: %s --admin [OPTIONS]\n\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "RAGFlow Admin Server\n\n") + fmt.Fprintf(os.Stderr, "Options:\n") + fmt.Fprintf(os.Stderr, " -f --config string\t\tPath to configuration file\n") + fmt.Fprintf(os.Stderr, " --port int \t\t\tServer port (overrides config file)\n") + fmt.Fprintf(os.Stderr, " --init-superuser\t\t\tInitialize superuser account\n") + fmt.Fprintf(os.Stderr, " -v, --version \t\t\tPrint version information and exit\n") + fmt.Fprintf(os.Stderr, " --debug \t\t\tEnable debug-level logging\n") + fmt.Fprintf(os.Stderr, " -h, --help \t\t\tShow this help message and exit\n") + case *args.mode == "ingestor": + fmt.Fprintf(os.Stderr, "Usage: %s --ingestor [OPTIONS]\n\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "RAGFlow Ingestion Worker - Document ingestion processing\n\n") + fmt.Fprintf(os.Stderr, "Options:\n") + fmt.Fprintf(os.Stderr, " -f --config string\tPath to config file\n") + fmt.Fprintf(os.Stderr, " --name string\t\t\tIngestion server name (default: \"default_ingestion\")\n") + fmt.Fprintf(os.Stderr, " --admin-host string\tAdmin server host:port (overrides config file)\n") + fmt.Fprintf(os.Stderr, " -v, --version \t\tPrint version information and exit\n") + fmt.Fprintf(os.Stderr, " --debug \t\tEnable debug-level logging\n") + fmt.Fprintf(os.Stderr, " -h, --help \t\tShow this help message and exit\n") + } } func main() { - // Parse command line flags - var portFlag int - flag.IntVar(&portFlag, "port", 0, "Server port (overrides config file)") - flag.IntVar(&portFlag, "p", 0, "Server port (shorthand, overrides config file)") - var debugFlag bool - flag.BoolVar(&debugFlag, "debug", false, "Enable debug-level logging") - var versionFlag bool - flag.BoolVar(&versionFlag, "version", false, "Print version information and exit") + arguments, err := parseArgs() + if err != nil { + fmt.Printf("Failed to parse arguments: %v\n", err) + return + } - // Custom help message - flag.Usage = printHelp + if arguments.helpFlag || arguments.mode == nil { + printHelp(arguments) + return + } - flag.Parse() - - // Handle --version flag: print version and exit immediately - if versionFlag { + if arguments.versionFlag { fmt.Printf("RAGFlow version: %s\n", utility.GetRAGFlowVersion()) return } - // Temporarily default to debug while investigating the Go chat/SSE path. - if err := common.Init("debug", common.FileOutput{Path: "server_main.log"}); err != nil { - panic(fmt.Sprintf("Failed to initialize logger: %v", err)) + // Initialize local variables (runtime variables from Redis) + err = server.InitLocalVariables() + if err != nil { + + fmt.Printf("Failed to start %s server: %v\n", *arguments.mode, err) + os.Exit(1) + } + + // Temporary logger initialization + var logFile string + var serverName string + if arguments.name != nil { + serverName = *arguments.name + } else { + serverName = fmt.Sprintf("%s_server", *arguments.mode) + } + logFile = fmt.Sprintf("%s.log", serverName) + + logLevel := "info" + if arguments.debugLog { + logLevel = "debug" + } + + if err = common.Init(logLevel, common.FileOutput{Path: logFile}); err != nil { + panic("failed to initialize logger: " + err.Error()) } // Initialize configuration - if err := server.Init(""); err != nil { - common.Fatal("Failed to initialize config", zap.Error(err)) + var configPath string + if arguments.configPath != nil { + configPath = *arguments.configPath + } + + if err = server.Init(configPath); err != nil { + common.Error("Failed to initialize configuration", err) + os.Exit(1) } - // Override port with command line argument if provided config := server.GetConfig() - if portFlag > 0 { - config.Server.Port = portFlag - common.Info("Port overridden by command line argument", zap.Int("port", portFlag)) + + // override default port if provided + switch *arguments.mode { + case "api": + port := config.Server.Port + if arguments.port != nil { + port = *arguments.port + config.Server.Port = port + } + if arguments.name == nil { + serverName = fmt.Sprintf("api_server_%d", port) + } + case "admin": + port := config.Admin.Port + if arguments.port != nil { + port = *arguments.port + config.Admin.Port = port + } + if arguments.name == nil { + serverName = fmt.Sprintf("admin_server_%d", port) + } + case "ingestor": + if serverName == "" { + uuid := common.GenerateUUID() + serverName = fmt.Sprintf("ingestor_server_%s", uuid) + } + default: + common.Error("invalid server mode", errors.New(*arguments.mode)) + os.Exit(1) } - if config.Server.Port == 0 { - common.Fatal("Server port is not configured. Please specify via --port flag or config file.") - } + // set server name and log file path + server.SetServerName(serverName) + logFile = fmt.Sprintf("%s.log", serverName) // Reinitialize logger with configured level if different - level := config.Log.Level - if level == "" { - level = "debug" + logLevel = config.Log.Level + if logLevel == "" { + logLevel = "info" } - if debugFlag { - level = "debug" + if arguments.debugLog { + logLevel = "debug" } + config.Log.Level = logLevel + fileOut := common.FileOutput{ - Path: "server_main.log", + Path: logFile, MaxSize: config.Log.MaxSize, MaxBackups: config.Log.MaxBackups, MaxAge: config.Log.MaxAge, @@ -133,50 +304,226 @@ func main() { if config.Log.Path != "" { fileOut.Path = config.Log.Path } - if err := common.Init(level, fileOut); err != nil { - common.Error("Failed to reinitialize logger", err) - } - server.SetLogger(common.Logger) - if config.Log.Level == "" { - config.Log.Level = common.GetLevel() + if err = common.Init(logLevel, fileOut); err != nil { + common.Error("Failed to reinitialize logger with configured level", err) } - common.Info("Server mode", zap.String("mode", config.Server.Mode)) + server.SetLogger(common.Logger) // Print all configuration settings + common.Info(fmt.Sprintf("Starting %s server: %s, mode: %s", *arguments.mode, serverName, config.Server.Mode)) server.PrintAll() // Initialize database - if err := dao.InitDB(); err != nil { + if err = dao.InitDB(); err != nil { common.Fatal("Failed to initialize database", zap.Error(err)) } // Initialize doc engine - if err := engine.Init(&config.DocEngine); err != nil { + if err = engine.Init(&config.DocEngine); err != nil { common.Fatal("Failed to initialize doc engine", zap.Error(err)) } defer engine.Close() // Initialize Redis cache - if err := redis.Init(&config.Redis); err != nil { + if err = redis.Init(&config.Redis); err != nil { common.Fatal("Failed to initialize Redis", zap.Error(err)) } defer redis.Close() - if err := storage.InitStorageFactory(); err != nil { + if err = storage.InitStorageFactory(); err != nil { common.Fatal("Failed to initialize storage factory", zap.Error(err)) } - if err := engine.InitMessageQueueEngine(config.TaskExecutor.MessageQueueType); err != nil { - common.Error("Failed to initialize message queue engine", err) + if err = engine.InitMessageQueueEngine(config.TaskExecutor.MessageQueueType); err != nil { + common.Fatal("Failed to initialize message queue engine", zap.Error(err)) } // Initialize server variables (runtime variables that can change during operation) // This must be done after Cache is initialized - if err := server.InitVariables(redis.Get()); err != nil { + if err = server.InitVariables(redis.Get()); err != nil { common.Warn("Failed to initialize server variables from Redis, using defaults", zap.String("error", err.Error())) } + if arguments.name == nil { + arguments.name = &serverName + } + + switch *arguments.mode { + case "api": + if err = runAPI(arguments); err != nil { + fmt.Printf("Failed to start API server: %v\n", err) + os.Exit(1) + } + case "admin": + if err = runAdmin(arguments); err != nil { + fmt.Printf("Failed to start admin server: %v\n", err) + os.Exit(1) + } + case "ingestor": + if err = runIngestor(arguments); err != nil { + fmt.Printf("Failed to start ingestion worker: %v\n", err) + os.Exit(1) + } + default: + fmt.Printf("Invalid server mode: %s\n", *arguments.mode) + os.Exit(1) + } +} + +func runAdmin(args *serverArgs) error { + adminService := admin.NewService() + adminHandler := admin.NewHandler(adminService) + + if args.initSuperUser { + // Initialize default admin user + if err := adminService.InitDefaultAdmin(); err != nil { + common.Error("Failed to initialize default admin user", err) + } + } + + // Initialize router + r := admin.NewRouter(adminHandler) + + // Create Gin engine + ginEngine := gin.New() + + // Middleware + ginEngine.Use(common.GinLogger()) + ginEngine.Use(gin.Recovery()) + + // Setup routes + r.Setup(ginEngine) + + // Create HTTP server + config := server.GetConfig() + addr := fmt.Sprintf(":%d", config.Admin.Port) + srv := &http.Server{ + Addr: addr, + Handler: ginEngine, + } + + // Print all configuration settings + server.PrintAll() + + // Print RAGFlow Admin logo + common.Info("" + + "\n ____ ___ ______________ ___ __ _ \n" + + " / __ \\/ | / ____/ ____/ /___ _ __ / | ____/ /___ ___ (_)___ \n" + + " / /_/ / /| |/ / __/ /_ / / __ \\ | /| / / / /| |/ __ / __ `__ \\/ / __ \\ \n" + + " / _, _/ ___ / /_/ / __/ / / /_/ / |/ |/ / / ___ / /_/ / / / / / / / / / /\n" + + " /_/ |_/_/ |_\\____/_/ /_/\\____/|__/|__/ /_/ |_\\__,_/_/ /_/ /_/_/_/ /_/ \n") + + // Print RAGFlow version + common.Info(fmt.Sprintf("RAGFlow admin version: %s", utility.GetRAGFlowVersion())) + + // Start HTTP server in a goroutine + go func() { + common.Info(fmt.Sprintf("Starting RAGFlow admin HTTP server on port: %d", config.Admin.Port)) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + common.Fatal("Failed to start server", zap.Error(err)) + } + }() + + // Wait for interrupt signal to gracefully shutdown + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2) + sig := <-quit + + common.Info("Received signal", zap.String("signal", sig.String())) + common.Info("Shutting down RAGFlow HTTP server...") + + // Create context with timeout for graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Shutdown HTTP server + if err := srv.Shutdown(ctx); err != nil { + common.Fatal("Server forced to shutdown", zap.Error(err)) + } + + common.Info("Admin HTTP server exited") + return nil +} + +func runIngestor(args *serverArgs) error { + + ingestor := ingestion.NewIngestor(*args.name, 2, []string{"pdf", "docx", "txt"}) + + go func() { + err := ingestor.Start() + if err != nil { + common.Error("Failed to initialize ingestor", err) + return + } + }() + + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2) + + // Print all configuration settings + server.PrintAll() + common.Info("\n ____ __ _\n" + + " / _/___ ____ ____ _____/ /_(_)___ ____ ________ ______ _____ _____\n" + + " / // __ \\/ __ `/ _ \\/ ___/ __/ / __ \\/ __ \\ / ___/ _ \\/ ___/ | / / _ \\/ ___/\n" + + " _/ // / / / /_/ / __(__ ) /_/ / /_/ / / / / (__ ) __/ / | |/ / __/ /\n" + + "/___/_/ /_/\\__, /\\___/____/\\__/_/\\____/_/ /_/ /____/\\___/_/ |___/\\___/_/\n" + + " /____/\n") + + // Print RAGFlow version + common.Info(fmt.Sprintf("RAGFlow ingestion service version: %s", utility.GetRAGFlowVersion())) + + // Get local IP address for heartbeat reporting + localIP, err := utility.GetLocalIP() + if err != nil { + common.Fatal("fail to get local ip address") + } + + // Initialize and start heartbeat reporter to admin server + service.AdminServiceClient = service.NewAdminClient( + common.Logger, + common.ServerTypeIngestion, + fmt.Sprintf("ingestor-%s", ingestor.ID()), + localIP, + -1, + ) + if err = service.AdminServiceClient.InitHTTPClient(); err != nil { + common.Warn("Failed to initialize heartbeat service", zap.Error(err)) + } else { + // Start heartbeat reporter with 30 seconds interval + heartbeatReporter := utility.NewScheduledTask("Heartbeat reporter", 3*time.Second, func() { + if err = service.AdminServiceClient.SendHeartbeat(); err == nil { + local.SetAdminStatus(0, "") + } else { + local.SetAdminStatus(1, err.Error()) + //logger.Warn(fmt.Sprintf(err.Error())) + } + }) + heartbeatReporter.Start() + defer heartbeatReporter.Stop() + } + + // Wait for either an OS signal or a shutdown command from the admin + select { + case sig := <-quit: + common.Info("Received signal", zap.String("signal", sig.String())) + common.Info(fmt.Sprintf("Shutting down RAGFlow ingestor %s ...", *args.name)) + case <-ingestor.ShutdownCh: + common.Info(fmt.Sprintf("Received shutdown command from admin, stopping ingestor %s ...", *args.name)) + } + + // Create context with timeout for graceful shutdown + _, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + ingestor.Stop() + + common.Info(fmt.Sprintf("Ingestor %s shutdown complete", *args.name)) + + return nil +} + +func runAPI(args *serverArgs) error { // Initialize admin status (default: unavailable=1) local.InitAdminStatus(1, "admin server not connected") @@ -199,9 +546,12 @@ func main() { common.Fatal("Failed to initialize query builder", zap.Error(err)) } + config := server.GetConfig() startServer(config) common.Info("Server exited") + + return nil } func startServer(config *server.Config) { @@ -217,7 +567,7 @@ func startServer(config *server.Config) { userService := service.NewUserService() documentService := service.NewDocumentService() datasetsService := service.NewDatasetService() - knowledgebaseService := service.NewKnowledgebaseService() + datasetService := service.NewKnowledgebaseService() metadataService := service.NewMetadataService() chunkService := chunk.NewChunkService() llmService := service.NewLLMService() @@ -245,11 +595,11 @@ func startServer(config *server.Config) { // Initialize handler layer authHandler := handler.NewAuthHandler() userHandler := handler.NewUserHandler(userService) - tenantHandler := handler.NewTenantHandler(tenantService, userService, knowledgebaseService) + tenantHandler := handler.NewTenantHandler(tenantService, userService, datasetService) documentHandler := handler.NewDocumentHandler(documentService, datasetsService) datasetsHandler := handler.NewDatasetsHandler(datasetsService, metadataService) systemHandler := handler.NewSystemHandler(systemService) - knowledgebaseHandler := handler.NewKnowledgebaseHandler(knowledgebaseService, userService, documentService) + datasethandler := handler.NewKnowledgebaseHandler(datasetService, userService, documentService) chunkHandler := handler.NewChunkHandler(chunkService, userService) llmHandler := handler.NewLLMHandler(llmService, userService) chatHandler := handler.NewChatHandler(chatService, userService) @@ -282,7 +632,7 @@ func startServer(config *server.Config) { // Install the agent service's Redis-backed run infrastructure // (CheckPointStore / StateSerializer / RunTracker). When Redis // is unreachable (degraded boot, stand-alone mode, no-redis CI) - // the constructors return errors and we fall through to the + // the constructors return errors, and we fall through to the // in-memory / no-tracking path: the agent service treats nil // options as the in-memory test path, so graceful degradation // is a 1-line if-not-nil pass-through — no separate "boot" mode @@ -293,8 +643,7 @@ func startServer(config *server.Config) { agentOpts.stateSerializer, agentOpts.runTracker, ) - agentHandler := handler.NewAgentHandler(agentService, fileService). - WithDocumentService(documentService) + agentHandler := handler.NewAgentHandler(agentService, fileService) // Public chatbot/agentbot endpoints (api/v1/chatbots/..., // api/v1/agentbots/...) and the agent attachment download. @@ -332,7 +681,7 @@ func startServer(config *server.Config) { docDAO := documentDAO retrievalService := nlp.NewRetrievalService(docEngine, docDAO) difyRetrievalHandler := handler.NewDifyRetrievalHandler( - knowledgebaseService, + datasetService, modelProviderService, metadataService, retrievalService, @@ -350,15 +699,47 @@ func startServer(config *server.Config) { // Redis blip at boot stranded canary operators with a 404 they // could not diagnose from the client side. Keep the route hot. var adminRuntimeSelector *runtime.Selector - if rdb := redis.Get().GetClient(); rdb != nil { - adminRuntimeSelector = runtime.NewSelector(rdb, common.Logger) + if redisClient := redis.Get(); redisClient != nil { + if rdb := redisClient.GetClient(); rdb != nil { + adminRuntimeSelector = runtime.NewSelector(rdb, common.Logger) + } } adminRuntimeHandler := handler.NewAdminRuntimeHandler(adminRuntimeSelector) // Initialize router - r := router.NewRouter(authHandler, userHandler, tenantHandler, documentHandler, datasetsHandler, systemHandler, knowledgebaseHandler, chunkHandler, llmHandler, chatHandler, chatChannelHandler, langfuseHandler, chatSessionHandler, connectorHandler, searchHandler, fileHandler, memoryHandler, mcpHandler, mcpServerHandler, skillSearchHandler, providerHandler, agentHandler, searchBotHandler, difyRetrievalHandler, pluginHandler, modelHandler, fileCommitHandler, adminRuntimeHandler, openaiChatHandler, botHandler) + r := router.NewRouter(authHandler, + userHandler, + tenantHandler, + documentHandler, + datasetsHandler, + systemHandler, + datasethandler, + chunkHandler, + llmHandler, + chatHandler, + chatChannelHandler, + langfuseHandler, + chatSessionHandler, + connectorHandler, + searchHandler, + fileHandler, + memoryHandler, + mcpHandler, + mcpServerHandler, + skillSearchHandler, + providerHandler, + agentHandler, + searchBotHandler, + difyRetrievalHandler, + pluginHandler, + modelHandler, + fileCommitHandler, + adminRuntimeHandler, + openaiChatHandler, + botHandler) + + // Create Gin enginegit diff - // Create Gin engine ginEngine := gin.New() // Middleware @@ -461,7 +842,7 @@ type agentRunOptions struct { // buildAgentRunOptions installs the Redis-backed run infrastructure // when Redis is available. The Redis client is the one already -// initialised at the top of main; the TTL is a conservative 24h for +// initialized at the top of main; the TTL is a conservative 24h for // both the checkpoint store and the run tracker. On any error // (Redis down at boot, constructor panic, nil-Redis fallback) we // log and return a zero-value struct — the agent service falls back diff --git a/docker/.env b/docker/.env index ba5b4b6e2f..a3ace5e4c1 100644 --- a/docker/.env +++ b/docker/.env @@ -159,6 +159,7 @@ GO_HTTP_PORT=9384 GO_ADMIN_PORT=9383 # API_PROXY_SCHEME=hybrid # go and python hybrid deploy mode +# API_PROXY_SCHEME=go # use go server deployment API_PROXY_SCHEME=python # use pure python server deployment # Development-only: set to 1 to bypass host safety checks for test_db_connection and allow private/local database hosts. diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index c8d279fbbe..915740f422 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -248,24 +248,6 @@ function ensure_db_init() { echo "Database tables initialized." } -function wait_for_server() { - local url="$1" - local server_name="$2" - local timeout=90 - local interval=2 - local start_time=$(date +%s) - - echo "Waiting for $server_name to be ready at $url..." - while ! curl -f -s -o /dev/null "$url"; do - if [ $(($(date +%s) - start_time)) -gt $timeout ]; then - echo "Timeout waiting for $server_name after $timeout seconds" - return 1 - fi - sleep $interval - done - echo "$server_name is ready." -} - # ----------------------------------------------------------------------------- # Start components based on flags # ----------------------------------------------------------------------------- @@ -283,43 +265,44 @@ if [[ "${INIT_MODEL_PROVIDER_TABLES}" -eq 1 ]]; then echo "Model provider table migrations completed." fi -if [[ "${ENABLE_WEBSERVER}" -eq 1 ]]; then - echo "Starting nginx..." - /usr/sbin/nginx - - while true; do - echo "Attempt to start RAGFlow server..." - "$PY" api/ragflow_server.py ${INIT_SUPERUSER_ARGS} - echo "RAGFlow python server started." - sleep 1; - done & +if [[ "${ENABLE_ADMIN_SERVER}" -eq 1 ]]; then + if [[ "${API_PROXY_SCHEME}" == "hybrid" ]] || [[ "${API_PROXY_SCHEME}" == "python" ]]; then + while true; do + echo "Attempt to start Admin python server..." + "$PY" admin/server/admin_server.py + echo "Admin python server started" + sleep 1; + done & + fi if [[ "${API_PROXY_SCHEME}" == "hybrid" ]] || [[ "${API_PROXY_SCHEME}" == "go" ]]; then while true; do - echo "Attempt to start RAGFlow go server..." - wait_for_server "http://127.0.0.1:9380/api/v1/system/healthz" "ragflow_server" - echo "Starting RAGFlow go server..." - bin/ragflow_server + echo "Starting Admin go server..." + bin/ragflow_main --admin + echo "Admin go server started." sleep 1; done & fi fi +if [[ "${ENABLE_WEBSERVER}" -eq 1 ]]; then + echo "Starting nginx..." + /usr/sbin/nginx -if [[ "${ENABLE_ADMIN_SERVER}" -eq 1 ]]; then - while true; do - echo "Attempt to start Admin python server..." - "$PY" admin/server/admin_server.py - echo "Admin python server started" - sleep 1; - done & + if [[ "${API_PROXY_SCHEME}" == "hybrid" ]] || [[ "${API_PROXY_SCHEME}" == "python" ]]; then + while true; do + echo "Attempt to start RAGFlow python server..." + "$PY" api/ragflow_server.py ${INIT_SUPERUSER_ARGS} + echo "RAGFlow python server started." + sleep 1; + done & + fi if [[ "${API_PROXY_SCHEME}" == "hybrid" ]] || [[ "${API_PROXY_SCHEME}" == "go" ]]; then while true; do - echo "Attempt to starting Admin go server..." - wait_for_server "http://127.0.0.1:9381/api/v1/admin/ping" "admin_server" - echo "Starting Admin go server..." - bin/admin_server + echo "Starting RAGFlow go server..." + bin/ragflow_main --api + echo "RAGFlow go server started." sleep 1; done & fi @@ -341,17 +324,39 @@ fi if [[ "${ENABLE_TASKEXECUTOR}" -eq 1 ]]; then if [[ "${CONSUMER_NO_END}" -gt "${CONSUMER_NO_BEG}" ]]; then - echo "Starting task executors on host '${HOST_ID}' for IDs in [${CONSUMER_NO_BEG}, ${CONSUMER_NO_END})..." - for (( i=CONSUMER_NO_BEG; i All three native libraries are statically linked — no `LD_LIBRARY_PATH` or `-Wl,-rpath` needed. ## 3. Run Go Version RAGFlow -Note: admin_server must be started first; otherwise, ragflow_server will encounter errors when sending heartbeats. +Note: admin server must be started first; otherwise, api server will encounter errors when sending heartbeats. ```bash # Start admin server -./bin/admin_server +./bin/ragflow_main --admin ``` ```bash # Start RAGFlow server -./bin/ragflow_server +./bin/ragflow_main --api ``` + ```bash -# Run CLI +# Start RAGFlow ingestor +./bin/ragflow_main --ingestor +``` + +```bash +# Run CLI in API mode ./bin/ragflow-cli ``` +```bash +# Run CLI in ADMIN mode +./bin/ragflow-cli --admin +``` + ## 4. Start Frontend ```bash cd web && export API_PROXY_SCHEME=hybrid && npm run dev ``` ## 5. Service Ports & API Routing -- ragflow_server listens on port 9384 -- admin_server listens on port 9383 +- api server listens on port 9384 by default +- admin server listens on port 9383 by default After updating or implementing an API, update the frontend development environment routes in web/vite.config.ts under proxySchemes. diff --git a/internal/engine/elasticsearch/client.go b/internal/engine/elasticsearch/client.go index 0a2bf8c46a..c69c9ae58f 100644 --- a/internal/engine/elasticsearch/client.go +++ b/internal/engine/elasticsearch/client.go @@ -24,7 +24,6 @@ import ( "io" "net/http" "os" - "path/filepath" "ragflow/internal/server" "ragflow/internal/utility" "time" @@ -88,11 +87,11 @@ func NewEngine(cfg interface{}) (*elasticsearchEngine, error) { // Create two index templates for different index types // Template for chunk indices (ragflow_*) - priority 1 - if err := engine.CreateIndexTemplate(context.Background(), "ragflow_mapping", "ragflow_*", "mapping.json", 1); err != nil { + if err = engine.CreateIndexTemplate(context.Background(), "ragflow_mapping", "ragflow_*", "mapping.json", 1); err != nil { return nil, fmt.Errorf("failed to create chunk index template: %w", err) } // Template for doc_meta indices (ragflow_doc_meta_*) - priority 2 (higher than ragflow_*) - if err := engine.CreateIndexTemplate(context.Background(), "ragflow_doc_meta_mapping", "ragflow_doc_meta_*", "doc_meta_es_mapping.json", 2); err != nil { + if err = engine.CreateIndexTemplate(context.Background(), "ragflow_doc_meta_mapping", "ragflow_doc_meta_*", "doc_meta_es_mapping.json", 2); err != nil { return nil, fmt.Errorf("failed to create doc_meta index template: %w", err) } @@ -140,16 +139,20 @@ func (e *elasticsearchEngine) CreateIndexTemplate(ctx context.Context, templateN mappingFileName = "mapping.json" } - // Read mapping from file - mappingPath := filepath.Join(utility.GetProjectRoot(), "conf", mappingFileName) - data, err := os.ReadFile(mappingPath) + mappingPath, err := utility.FindConfFileInProject(mappingFileName) if err != nil { - return fmt.Errorf("failed to read mapping file: %w", err) + return err + } + + // Read mapping from file + data, err := os.ReadFile(*mappingPath) + if err != nil { + return fmt.Errorf("failed to read mapping file %q: %w", *mappingPath, err) } var mapping map[string]interface{} - if err := json.Unmarshal(data, &mapping); err != nil { - return fmt.Errorf("failed to parse mapping file: %w", err) + if err = json.Unmarshal(data, &mapping); err != nil { + return fmt.Errorf("failed to parse mapping file %q: %w", *mappingPath, err) } // Separate settings and mappings from the mapping file @@ -189,7 +192,7 @@ func (e *elasticsearchEngine) CreateIndexTemplate(ctx context.Context, templateN } var result map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&result); err != nil { + if err = json.NewDecoder(res.Body).Decode(&result); err != nil { return fmt.Errorf("failed to parse response: %w", err) } @@ -215,7 +218,7 @@ func (e *elasticsearchEngine) GetClusterStats() (map[string]interface{}, error) } var rawStats map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&rawStats); err != nil { + if err = json.NewDecoder(res.Body).Decode(&rawStats); err != nil { return nil, fmt.Errorf("failed to decode cluster stats: %w", err) } @@ -278,8 +281,8 @@ func (e *elasticsearchEngine) GetClusterStats() (map[string]interface{}, error) if versions, ok := nodes["versions"].([]interface{}); ok { result["nodes_version"] = versions } - if os, ok := nodes["os"].(map[string]interface{}); ok { - if mem, ok := os["mem"].(map[string]interface{}); ok { + if operatingSystem, ok := nodes["os"].(map[string]interface{}); ok { + if mem, ok := operatingSystem["mem"].(map[string]interface{}); ok { if totalInBytes, ok := mem["total_in_bytes"].(float64); ok { result["os_mem"] = convertBytes(int64(totalInBytes)) } diff --git a/internal/engine/global.go b/internal/engine/global.go index 3a443f7cef..8d9657b74d 100644 --- a/internal/engine/global.go +++ b/internal/engine/global.go @@ -26,8 +26,9 @@ import ( "ragflow/internal/engine/elasticsearch" "ragflow/internal/engine/infinity" - "go.uber.org/zap" "ragflow/internal/tokenizer" + + "go.uber.org/zap" ) var ( @@ -96,6 +97,8 @@ func InitMessageQueueEngine(messageQueueType string) error { if err != nil { return err } + case "": + return fmt.Errorf("message queue type is empty") default: return fmt.Errorf("unsupported message queue type: %s", messageQueueType) } diff --git a/internal/engine/infinity/chunk.go b/internal/engine/infinity/chunk.go index ad71d1d1da..6d6852e9c8 100644 --- a/internal/engine/infinity/chunk.go +++ b/internal/engine/infinity/chunk.go @@ -22,7 +22,6 @@ import ( "fmt" "hash/fnv" "os" - "path/filepath" "ragflow/internal/common" "ragflow/internal/engine/types" "ragflow/internal/utility" @@ -67,23 +66,26 @@ func (e *infinityEngine) CreateChunkStore(ctx context.Context, baseName, dataset common.Info("Creating regular index table", zap.String("tableName", tableName), zap.String("mappingFile", mappingFile)) } - // Use configured schema - fpMapping := filepath.Join(utility.GetProjectRoot(), "conf", mappingFile) - - schemaData, err := os.ReadFile(fpMapping) + fpMapping, err := utility.FindConfFileInProject(mappingFile) if err != nil { - return fmt.Errorf("Failed to read mapping file: %w", err) + return err + } + + // Use configured schema + schemaData, err := os.ReadFile(*fpMapping) + if err != nil { + return fmt.Errorf("failed to read mapping file %s: %w", *fpMapping, err) } var schema orderedFields if err := json.Unmarshal(schemaData, &schema); err != nil { - return fmt.Errorf("Failed to parse mapping file: %w", err) + return fmt.Errorf("failed to parse mapping file: %w", err) } // Get database db, err := e.client.conn.GetDatabase(e.client.dbName) if err != nil { - return fmt.Errorf("Failed to get database: %w", err) + return fmt.Errorf("failed to get database: %w", err) } // Determine vector column name @@ -92,7 +94,7 @@ func (e *infinityEngine) CreateChunkStore(ctx context.Context, baseName, dataset // Check if table already exists exists, err := e.tableExists(ctx, tableName) if err != nil { - return fmt.Errorf("Failed to check if table exists: %w", err) + return fmt.Errorf("failed to check if table exists: %w", err) } var table *infinity.Table @@ -101,7 +103,7 @@ func (e *infinityEngine) CreateChunkStore(ctx context.Context, baseName, dataset common.Info("Table already exists, checking for vector column", zap.String("tableName", tableName)) table, err = db.GetTable(tableName) if err != nil { - return fmt.Errorf("Failed to open existing table %s: %w", tableName, err) + return fmt.Errorf("failed to open existing table %s: %w", tableName, err) } // Check if vector column exists (for embedding model changes) @@ -121,7 +123,7 @@ func (e *infinityEngine) CreateChunkStore(ctx context.Context, baseName, dataset } if _, err := table.AddColumns(addColSchema); err != nil { common.Error("Failed to add vector column "+vectorColName, err) - return fmt.Errorf("Failed to add vector column %s: %w", vectorColName, err) + return fmt.Errorf("failed to add vector column %s: %w", vectorColName, err) } common.Info("Successfully added vector column", zap.String("column", vectorColName)) } @@ -159,7 +161,7 @@ func (e *infinityEngine) CreateChunkStore(ctx context.Context, baseName, dataset // Create table table, err = db.CreateTable(tableName, columns, infinity.ConflictTypeIgnore) if err != nil { - return fmt.Errorf("Failed to create table: %w", err) + return fmt.Errorf("failed to create table: %w", err) } common.Debug("Infinity created table", zap.String("tableName", tableName)) } @@ -179,7 +181,7 @@ func (e *infinityEngine) CreateChunkStore(ctx context.Context, baseName, dataset "", ) if err != nil { - return fmt.Errorf("Failed to create HNSW index %s: %w", vectorIndexName, err) + return fmt.Errorf("failed to create HNSW index %s: %w", vectorIndexName, err) } common.Info("Created vector index", zap.String("indexName", vectorIndexName), zap.String("column", vectorColName)) @@ -214,7 +216,7 @@ func (e *infinityEngine) CreateChunkStore(ctx context.Context, baseName, dataset "", ) if err != nil { - return fmt.Errorf("Failed to create fulltext index %s: %w", indexNameFt, err) + return fmt.Errorf("failed to create fulltext index %s: %w", indexNameFt, err) } } } @@ -250,7 +252,7 @@ func (e *infinityEngine) CreateChunkStore(ctx context.Context, baseName, dataset "", ) if err != nil { - return fmt.Errorf("Failed to create secondary index %s: %w", indexNameSec, err) + return fmt.Errorf("failed to create secondary index %s: %w", indexNameSec, err) } } } @@ -258,7 +260,7 @@ func (e *infinityEngine) CreateChunkStore(ctx context.Context, baseName, dataset return nil } -// InsertChunks inserts documents into a dataset table +// InsertChunks inserts documents into a dataset table; // Table name format: {baseName}_{datasetID} // Auto-create the table if it doesn't exist // Delete existing rows with matching IDs before insert diff --git a/internal/engine/infinity/metadata.go b/internal/engine/infinity/metadata.go index dcc7ef34f8..a146458a52 100644 --- a/internal/engine/infinity/metadata.go +++ b/internal/engine/infinity/metadata.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "os" - "path/filepath" "strings" "ragflow/internal/common" @@ -42,29 +41,32 @@ func (e *infinityEngine) CreateMetadataStore(ctx context.Context, tenantID strin // Get database db, err := e.client.conn.GetDatabase(e.client.dbName) if err != nil { - return fmt.Errorf("Failed to get database: %w", err) + return fmt.Errorf("failed to get database: %w", err) } // Check if table already exists exists, err := e.tableExists(ctx, tableName) if err != nil { - return fmt.Errorf("Failed to check if table exists: %w", err) + return fmt.Errorf("failed to check if table exists: %w", err) } if exists { return fmt.Errorf("metadata table '%s' already exists", tableName) } - // Use configured doc_meta mapping file - fpMapping := filepath.Join(utility.GetProjectRoot(), "conf", e.docMetaMappingFileName) - - schemaData, err := os.ReadFile(fpMapping) + fpMapping, err := utility.FindConfFileInProject(e.docMetaMappingFileName) if err != nil { - return fmt.Errorf("Failed to read mapping file: %w", err) + return err + } + + // Use configured doc_meta mapping file + schemaData, err := os.ReadFile(*fpMapping) + if err != nil { + return fmt.Errorf("failed to read mapping file %q: %w", *fpMapping, err) } var schema map[string]fieldInfo - if err := json.Unmarshal(schemaData, &schema); err != nil { - return fmt.Errorf("Failed to parse mapping file: %w", err) + if err = json.Unmarshal(schemaData, &schema); err != nil { + return fmt.Errorf("failed to parse mapping file %q: %w", *fpMapping, err) } // Build column definitions @@ -81,14 +83,14 @@ func (e *infinityEngine) CreateMetadataStore(ctx context.Context, tenantID strin // Create table _, err = db.CreateTable(tableName, columns, infinity.ConflictTypeIgnore) if err != nil { - return fmt.Errorf("Failed to create doc meta table: %w", err) + return fmt.Errorf("failed to create doc meta table: %w", err) } common.Debug("Infinity created doc meta table", zap.String("tableName", tableName)) // Get table for creating indexes table, err := db.GetTable(tableName) if err != nil { - return fmt.Errorf("Failed to get table: %w", err) + return fmt.Errorf("failed to get table: %w", err) } // Create secondary index on id @@ -99,7 +101,7 @@ func (e *infinityEngine) CreateMetadataStore(ctx context.Context, tenantID strin "", ) if err != nil { - return fmt.Errorf("Failed to create secondary index on id: %w", err) + return fmt.Errorf("failed to create secondary index on id: %w", err) } // Create secondary index on kb_id @@ -110,7 +112,7 @@ func (e *infinityEngine) CreateMetadataStore(ctx context.Context, tenantID strin "", ) if err != nil { - return fmt.Errorf("Failed to create secondary index on kb_id: %w", err) + return fmt.Errorf("failed to create secondary index on kb_id: %w", err) } // Create secondary index on meta_fields for metadata filter queries @@ -121,7 +123,7 @@ func (e *infinityEngine) CreateMetadataStore(ctx context.Context, tenantID strin "", ) if err != nil { - return fmt.Errorf("Failed to create secondary index on meta_fields: %w", err) + return fmt.Errorf("failed to create secondary index on meta_fields: %w", err) } return nil @@ -136,7 +138,7 @@ func (e *infinityEngine) InsertMetadata(ctx context.Context, metadata []map[stri db, err := e.client.conn.GetDatabase(e.client.dbName) if err != nil { - return nil, fmt.Errorf("Failed to get database: %w", err) + return nil, fmt.Errorf("failed to get database: %w", err) } table, err := db.GetTable(tableName) @@ -144,17 +146,17 @@ func (e *infinityEngine) InsertMetadata(ctx context.Context, metadata []map[stri // Table doesn't exist, try to create it errMsg := strings.ToLower(err.Error()) if !strings.Contains(errMsg, "not found") && !strings.Contains(errMsg, "doesn't exist") { - return nil, fmt.Errorf("Failed to get table %s: %w", tableName, err) + return nil, fmt.Errorf("failed to get table %s: %w", tableName, err) } // Create metadata table if createErr := e.CreateMetadataStore(ctx, tenantID); createErr != nil { - return nil, fmt.Errorf("Failed to create metadata table: %w", createErr) + return nil, fmt.Errorf("failed to create metadata table: %w", createErr) } table, err = db.GetTable(tableName) if err != nil { - return nil, fmt.Errorf("Failed to get table after creation: %w", err) + return nil, fmt.Errorf("failed to get table after creation: %w", err) } } @@ -194,7 +196,7 @@ func (e *infinityEngine) InsertMetadata(ctx context.Context, metadata []map[stri // Insert metadata _, err = table.Insert(insertMetadata) if err != nil { - return nil, fmt.Errorf("Failed to insert metadata: %w", err) + return nil, fmt.Errorf("failed to insert metadata: %w", err) } common.Info("InfinityConnection.InsertMetadata result", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata))) diff --git a/internal/engine/infinity/sql.go b/internal/engine/infinity/sql.go index 879b337f48..e7b15b812c 100644 --- a/internal/engine/infinity/sql.go +++ b/internal/engine/infinity/sql.go @@ -24,7 +24,6 @@ import ( "fmt" "os" "os/exec" - "path/filepath" "regexp" "strconv" "strings" @@ -59,18 +58,23 @@ func loadFieldMapping(mappingFileName string) (aliasToActual map[string]string, if mappingFileName == "" { mappingFileName = "infinity_mapping.json" } - confPath := filepath.Join(utility.GetProjectRoot(), "conf", mappingFileName) - data, err := os.ReadFile(confPath) + + filePath, err := utility.FindConfFileInProject(mappingFileName) + if err != nil { + return nil, nil, err + } + + data, err := os.ReadFile(*filePath) if err != nil { if errors.Is(err, os.ErrNotExist) { return map[string]string{}, map[string]string{}, nil } - return nil, nil, fmt.Errorf("load field mapping %q: %w", confPath, err) + return nil, nil, fmt.Errorf("load field mapping %q: %w", *filePath, err) } fields := map[string]fieldMappingEntry{} - if err := json.Unmarshal(data, &fields); err != nil { - return nil, nil, fmt.Errorf("parse field mapping %q: %w", confPath, err) + if err = json.Unmarshal(data, &fields); err != nil { + return nil, nil, fmt.Errorf("parse field mapping %q: %w", *filePath, err) } aliasToActual = make(map[string]string, len(fields)*2) diff --git a/internal/engine/infinity/sql_test.go b/internal/engine/infinity/sql_test.go index f072d1c47f..755b124791 100644 --- a/internal/engine/infinity/sql_test.go +++ b/internal/engine/infinity/sql_test.go @@ -308,22 +308,6 @@ func TestResolvePsqlHostPort_EmptyHostInURIFallsBackToDefault(t *testing.T) { } } -// ----------------------------------------------------------------------------- -// loadFieldMapping — mirrors infinity_conn_base.py:793-807. -// ----------------------------------------------------------------------------- - -func TestLoadFieldMapping_MissingFileReturnsEmpty(t *testing.T) { - // Use a name that doesn't exist; the function should silently - // return empty maps (matching Python's `os.path.exists` guard). - a2a, r2a, err := loadFieldMapping("nonexistent_mapping_xyz.json") - if err != nil { - t.Fatalf("missing file should be a no-op, got error: %v", err) - } - if len(a2a) != 0 || len(r2a) != 0 { - t.Errorf("missing file should yield empty maps; got a2a=%v r2a=%v", a2a, r2a) - } -} - func TestLoadFieldMapping_ParsesAliases(t *testing.T) { // Write a temporary mapping file. dir := t.TempDir() @@ -391,7 +375,7 @@ func TestLoadFieldMapping_EmptyNameDefaultsToInfinityMappingJSON(t *testing.T) { if err != nil { t.Fatalf("empty name: %v", err) } - if len(a2a) != 0 || len(r2a) != 0 { + if len(a2a) == 0 || len(r2a) == 0 { t.Errorf("empty name + no file should yield empty maps; got a2a=%v r2a=%v", a2a, r2a) } } diff --git a/internal/engine/nats/nats.go b/internal/engine/nats/nats.go index 8e0ebf27de..30d189f9d2 100644 --- a/internal/engine/nats/nats.go +++ b/internal/engine/nats/nats.go @@ -48,15 +48,16 @@ func NewNatsEngine(host string, port int) *NatsEngine { func (n *NatsEngine) Init() error { var err error - n.nc, err = nats.Connect(nats.DefaultURL) + natsURL := fmt.Sprintf("nats://%s:%d", n.host, n.port) + n.nc, err = nats.Connect(natsURL) if err != nil { - return fmt.Errorf("failed to connect to NATS: %w", err) + return fmt.Errorf("failed to connect to NATS at %s: %w", natsURL, err) } n.jetStream, err = jetstream.New(n.nc) if err != nil { n.nc.Close() - return fmt.Errorf("failed to create JetStream context: %w", err) + return fmt.Errorf("failed to create JetStream context at %s: %w", natsURL, err) } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -73,19 +74,19 @@ func (n *NatsEngine) Init() error { n.stream, err = n.jetStream.CreateStream(ctx, streamCfg) if err != nil { - if err.Error() != "stream already exists" { + if !strings.Contains(err.Error(), "already exists") { n.nc.Close() - return fmt.Errorf("fail to create stream: %w", err) + return fmt.Errorf("fail to create stream at %s: %w", natsURL, err) } common.Info("NATS stream already exists, use existing stream") n.stream, err = n.jetStream.Stream(ctx, "RAGFLOW_TASKS") if err != nil { n.nc.Close() - return fmt.Errorf("fail to get existing stream: %w", err) + return fmt.Errorf("fail to get existing stream at %s: %w", natsURL, err) } } else { - common.Info("NATS stream create successfully") + common.Info(fmt.Sprintf("NATS stream create successfully at %s", natsURL)) } return nil @@ -181,6 +182,9 @@ func (n *NatsEngine) ListMessages(messageType string, pending bool) ([]map[strin } func (n *NatsEngine) InitConsumer(subject string) error { + if n.stream == nil { + return fmt.Errorf("NATS stream is nil, engine not properly initialized") + } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/internal/server/config.go b/internal/server/config.go index 7a4f88b998..e097964389 100644 --- a/internal/server/config.go +++ b/internal/server/config.go @@ -905,11 +905,11 @@ func PrintAll() { } allSettings := globalViper.AllSettings() - zapLogger.Info("=== All Configuration Settings ===") + zapLogger.Info("=== All Configurations ===") for key, value := range allSettings { zapLogger.Info("config", zap.String("key", key), zap.Any("value", value)) } - zapLogger.Info("=== End Configuration ===") + zapLogger.Info("=== End Configurations ===") } // parseHostPort parses host:port string and returns host and port diff --git a/internal/server/local_variables.go b/internal/server/local_variables.go new file mode 100644 index 0000000000..e3f5547855 --- /dev/null +++ b/internal/server/local_variables.go @@ -0,0 +1,53 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package server + +import ( + "ragflow/internal/common" + "sync" +) + +type LocalVariables struct { + ServerName *string // Server name, can be modified at runtime +} + +var ( + localVariables *LocalVariables + localVariablesOnce sync.Once + localVariablesMu sync.RWMutex +) + +func InitLocalVariables() error { + var initErr error + localVariablesOnce.Do(func() { + localVariables = &LocalVariables{} + common.Info("Local variables initialized successfully") + }) + return initErr +} + +func SetServerName(serverName string) { + localVariablesMu.Lock() + defer localVariablesMu.Unlock() + localVariables.ServerName = &serverName +} + +func GetServerName() string { + localVariablesMu.RLock() + defer localVariablesMu.RUnlock() + return *localVariables.ServerName +} diff --git a/internal/utility/path.go b/internal/utility/path.go index d913a26674..038237cbb9 100644 --- a/internal/utility/path.go +++ b/internal/utility/path.go @@ -17,6 +17,7 @@ limitations under the License. package utility import ( + "fmt" "os" "path/filepath" "runtime" @@ -58,3 +59,53 @@ func GetProjectRoot() string { } return filepath.Dir(filepath.Dir(exe)) } + +func FindConfFileInProject(fileName string) (*string, error) { + + var filePath string + if projDir := os.Getenv("RAG_PROJECT_BASE"); projDir != "" { + filePath = filepath.Join(projDir, "conf", fileName) + if _, err := os.Stat(filePath); err == nil { + return &filePath, nil + } + } + + if projDir := os.Getenv("RAG_DEPLOY_BASE"); projDir != "" { + filePath = filepath.Join(projDir, "conf", fileName) + if _, err := os.Stat(filePath); err == nil { + return &filePath, nil + } + } + + exeFilePath, err := os.Executable() + if err == nil { + projDir := filepath.Dir(filepath.Dir(exeFilePath)) + filePath = filepath.Join(projDir, "conf", fileName) + if _, err = os.Stat(filePath); err == nil { + return &filePath, nil + } + } + + _, curFile, _, _ := runtime.Caller(0) + dir := filepath.Dir(curFile) + for { + if _, err = os.Stat(filepath.Join(dir, "go.mod")); err == nil { + filePath = filepath.Join(dir, "conf", fileName) + if _, err = os.Stat(filePath); err == nil { + return &filePath, nil + } + return nil, fmt.Errorf("conf file %s not found in %s", fileName, dir) + } + parent := filepath.Dir(dir) + if parent == dir { + projDir := filepath.Dir(filepath.Dir(filepath.Dir(filepath.Dir(curFile)))) + filePath = filepath.Join(projDir, "conf", fileName) + if _, err = os.Stat(filePath); err == nil { + return &filePath, nil + } + + return nil, fmt.Errorf("conf file %s not found in %s", fileName, projDir) + } + dir = parent + } +}