From 1aa8abe3734856409a0ca07b825ae5c103ee2fe5 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Fri, 3 Jul 2026 11:14:02 +0800 Subject: [PATCH] Go: file syncer service framework (#16579) ### Summary ./ragflow_main --syncer to start file syncer config yaml file has following config ``` file_syncer: max_concurrent_syncs: 4 # concurrent file sync threads sync_interval: 3 # check interval ``` --------- Signed-off-by: Jin Hai --- cmd/ragflow_main.go | 114 +++++++++++++++++++++--- conf/service_conf.yaml | 3 + docker/service_conf.yaml.template | 3 + internal/common/status_message.go | 6 +- internal/router/router.go | 1 - internal/server/config.go | 6 ++ internal/syncer/syncer.go | 143 ++++++++++++++++++++++++++++++ 7 files changed, 262 insertions(+), 14 deletions(-) create mode 100644 internal/syncer/syncer.go diff --git a/cmd/ragflow_main.go b/cmd/ragflow_main.go index bcdf276194..60e3f8c00a 100644 --- a/cmd/ragflow_main.go +++ b/cmd/ragflow_main.go @@ -37,6 +37,7 @@ import ( "ragflow/internal/service/chunk" "ragflow/internal/service/nlp" "ragflow/internal/storage" + "ragflow/internal/syncer" "ragflow/internal/tokenizer" "strconv" "strings" @@ -56,15 +57,15 @@ import ( ) type serverArgs struct { - mode *string // admin | api | ingestor + mode *string // admin | api | ingestor | syncer helpFlag bool versionFlag bool debugLog bool configPath *string // Used by admin, api; user defined config path initSuperUser bool // Used by admin; port *int // Used by admin, api - adminHost *string // Used by api and ingestor for heartbeat - adminPort *int // Used by api and ingestor for heartbeat, "ip:port" + adminHost *string // Used by api, ingestor, syncer for heartbeat + adminPort *int // Used by api, ingestor, syncer for heartbeat, "ip:port" name *string // server name } @@ -85,6 +86,9 @@ func parseArgs() (*serverArgs, error) { case "--api": serverMode = "api" args.mode = &serverMode + case "--syncer": + serverMode = "syncer" + args.mode = &serverMode case "-h", "--help": args.helpFlag = true case "-v", "--version": @@ -188,6 +192,16 @@ func printHelp(args *serverArgs) { fmt.Fprintf(os.Stderr, " -v, --version \t\tPrint version information and exit\n") fmt.Fprintf(os.Stderr, " --debug \t\tEnable debug-level logging\n") fmt.Fprintf(os.Stderr, " -h, --help \t\tShow this help message and exit\n") + case *args.mode == "syncer": + fmt.Fprintf(os.Stderr, "Usage: %s --syncer [OPTIONS]\n\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "RAGFlow Sync Service - Sync files from source to RAGFlow\n\n") + fmt.Fprintf(os.Stderr, "Options:\n") + fmt.Fprintf(os.Stderr, " -f --config string\tPath to config file\n") + fmt.Fprintf(os.Stderr, " --name string\t\t\tSync service server name (default: \"default_syncer\")\n") + fmt.Fprintf(os.Stderr, " --admin-host string\tAdmin server host:port (overrides config file)\n") + fmt.Fprintf(os.Stderr, " -v, --version \t\tPrint version information and exit\n") + fmt.Fprintf(os.Stderr, " --debug \t\tEnable debug-level logging\n") + fmt.Fprintf(os.Stderr, " -h, --help \t\tShow this help message and exit\n") } } @@ -273,6 +287,11 @@ func main() { uuid := common.GenerateUUID() serverName = fmt.Sprintf("ingestor_server_%s", uuid) } + case "syncer": + if serverName == "" { + uuid := common.GenerateUUID() + serverName = fmt.Sprintf("syncer_server_%s", uuid) + } default: common.Error("invalid server mode", errors.New(*arguments.mode)) os.Exit(1) @@ -365,6 +384,11 @@ func main() { fmt.Printf("Failed to start ingestion worker: %v\n", err) os.Exit(1) } + case "syncer": + if err = runSyncer(arguments); err != nil { + fmt.Printf("Failed to start syncer: %v\n", err) + os.Exit(1) + } default: fmt.Printf("Invalid server mode: %s\n", *arguments.mode) os.Exit(1) @@ -403,9 +427,6 @@ func runAdmin(args *serverArgs) error { Handler: ginEngine, } - // Print all configuration settings - server.PrintAll() - // Print RAGFlow Admin logo common.Info("" + "\n ____ ___ ______________ ___ __ _ \n" + @@ -461,8 +482,6 @@ func runIngestor(args *serverArgs) error { quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2) - // Print all configuration settings - server.PrintAll() common.Info("\n ____ __ _\n" + " / _/___ ____ ____ _____/ /_(_)___ ____ ________ ______ _____ _____\n" + " / // __ \\/ __ `/ _ \\/ ___/ __/ / __ \\/ __ \\ / ___/ _ \\/ ___/ | / / _ \\/ ___/\n" + @@ -523,6 +542,81 @@ func runIngestor(args *serverArgs) error { return nil } +func runSyncer(args *serverArgs) error { + config := server.GetConfig() + fileSyncer := syncer.NewSyncer(config.FileSyncer.MaxConcurrentSyncs, time.Duration(config.FileSyncer.SyncInterval)*time.Second) + + go func() { + err := fileSyncer.Start() + if err != nil { + common.Error("Failed to initialize file syncer", err) + return + } + }() + + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2) + + common.Info("\n _______ __ _____\n" + + " / ____(_) /__ / ___/__ ______ ________ _____\n" + + " / /_ / / / _ \\ \\__ \\/ / / / __ \\/ ___/ _ \\/ ___/\n" + + " / __/ / / / __/ ___/ / /_/ / / / / /__/ __/ /\n" + + " /_/ /_/_/\\___/ /____/\\__, /_/ /_/\\___/\\___/_/\n" + + " /____/ \n") + + // Print RAGFlow version + common.Info(fmt.Sprintf("RAGFlow file syncer service version: %s", utility.GetRAGFlowVersion())) + + // Get local IP address for heartbeat reporting + localIP, err := utility.GetLocalIP() + if err != nil { + common.Fatal("fail to get local ip address") + } + + // Initialize and start heartbeat reporter to admin server + service.AdminServiceClient = service.NewAdminClient( + common.Logger, + common.ServerTypeFileSyncer, + fmt.Sprintf("syncer-%s", fileSyncer.ID()), + localIP, + -1, + ) + if err = service.AdminServiceClient.InitHTTPClient(); err != nil { + common.Warn("Failed to initialize heartbeat service", zap.Error(err)) + } else { + // Start heartbeat reporter with 30 seconds interval + heartbeatReporter := utility.NewScheduledTask("Heartbeat reporter", 3*time.Second, func() { + if err = service.AdminServiceClient.SendHeartbeat(); err == nil { + local.SetAdminStatus(0, "") + } else { + local.SetAdminStatus(1, err.Error()) + //logger.Warn(fmt.Sprintf(err.Error())) + } + }) + heartbeatReporter.Start() + defer heartbeatReporter.Stop() + } + + // Wait for either an OS signal or a shutdown command from the admin + select { + case sig := <-quit: + common.Info("Received signal", zap.String("signal", sig.String())) + common.Info(fmt.Sprintf("Shutting down RAGFlow file syncer %s ...", *args.name)) + case <-fileSyncer.ShutdownCh: + common.Info(fmt.Sprintf("Received shutdown command from admin, stopping file syncer %s ...", *args.name)) + } + + // Create context with timeout for graceful shutdown + _, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + fileSyncer.Stop() + + common.Info(fmt.Sprintf("File syncer %s shutdown complete", *args.name)) + + return nil +} + func runAPI(args *serverArgs) error { // Initialize admin status (default: unavailable=1) local.InitAdminStatus(1, "admin server not connected") @@ -599,7 +693,7 @@ func startServer(config *server.Config) { documentHandler := handler.NewDocumentHandler(documentService, datasetsService) datasetsHandler := handler.NewDatasetsHandler(datasetsService, metadataService) systemHandler := handler.NewSystemHandler(systemService) - datasethandler := handler.NewKnowledgebaseHandler(datasetService, userService, documentService) + datasetHandler := handler.NewKnowledgebaseHandler(datasetService, userService, documentService) chunkHandler := handler.NewChunkHandler(chunkService, userService) llmHandler := handler.NewLLMHandler(llmService, userService) chatHandler := handler.NewChatHandler(chatService, userService) @@ -713,7 +807,7 @@ func startServer(config *server.Config) { documentHandler, datasetsHandler, systemHandler, - datasethandler, + datasetHandler, chunkHandler, llmHandler, chatHandler, diff --git a/conf/service_conf.yaml b/conf/service_conf.yaml index b535595e52..fe7fdf9126 100644 --- a/conf/service_conf.yaml +++ b/conf/service_conf.yaml @@ -53,6 +53,9 @@ nats: port: 4222 task_executor: message_queue_type: 'nats' +file_syncer: + max_concurrent_syncs: 4 + sync_interval: 3 user_default_llm: default_models: embedding_model: diff --git a/docker/service_conf.yaml.template b/docker/service_conf.yaml.template index ba11385c1d..ec051ae007 100644 --- a/docker/service_conf.yaml.template +++ b/docker/service_conf.yaml.template @@ -65,6 +65,9 @@ nats: port: ${NATS_PORT:-4222} task_executor: message_queue_type: 'nats' +file_syncer: + max_concurrent_syncs: 4 + sync_interval: 3 user_default_llm: default_models: embedding_model: diff --git a/internal/common/status_message.go b/internal/common/status_message.go index ad68edb42b..2c92590213 100644 --- a/internal/common/status_message.go +++ b/internal/common/status_message.go @@ -31,9 +31,9 @@ const ( type ServerType string const ( - ServerTypeAPI ServerType = "api_server" // API server - ServerTypeIngestion ServerType = "ingestor" // Ingestion server - ServerTypeDataCollector ServerType = "data_collector" // Data collection server + ServerTypeAPI ServerType = "api_server" // API server + ServerTypeIngestion ServerType = "ingestor" // Ingestion server + ServerTypeFileSyncer ServerType = "file_syncer" // File syncer server ) type BaseMessage struct { diff --git a/internal/router/router.go b/internal/router/router.go index fa1ee7cc2e..dfe79240e8 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -209,7 +209,6 @@ func (r *Router) Setup(engine *gin.Engine) { RegisterAgentbotRoutes(agentbotGroup, betaMW, r.botHandler) } // Public bot endpoints (authenticated with an SDK beta token, not a session) - apiBetaAuth.GET("/chatbots/:dialog_id/info", r.botHandler.ChatbotInfo) apiBetaAuth.GET("/documents/:id/preview", r.documentHandler.GetDocumentPreview) apiBetaAuth.GET("/documents/images/:image_id", r.documentHandler.GetDocumentImage) apiBetaAuth.GET("/thumbnails", r.documentHandler.GetThumbnail) diff --git a/internal/server/config.go b/internal/server/config.go index e097964389..d2ac97f64a 100644 --- a/internal/server/config.go +++ b/internal/server/config.go @@ -53,6 +53,7 @@ type Config struct { DefaultSuperUser DefaultSuperUser `mapstructure:"default_super_user"` Language string `mapstructure:"language"` TaskExecutor TaskExecutorConfig `mapstructure:"task_executor"` + FileSyncer FileSyncerConfig `mapstructure:"file_syncer"` } // AdminConfig admin server configuration @@ -76,6 +77,11 @@ type TaskExecutorConfig struct { MessageQueueType string `mapstructure:"message_queue_type"` } +type FileSyncerConfig struct { + MaxConcurrentSyncs int `mapstructure:"max_concurrent_syncs"` + SyncInterval int `mapstructure:"sync_interval"` +} + // UserDefaultLLMConfig user default LLM configuration type UserDefaultLLMConfig struct { DefaultModels DefaultModelsConfig `mapstructure:"default_models"` diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go new file mode 100644 index 0000000000..26a11fed85 --- /dev/null +++ b/internal/syncer/syncer.go @@ -0,0 +1,143 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package syncer + +import ( + "context" + "fmt" + "ragflow/internal/common" + "ragflow/internal/dao" + "ragflow/internal/entity" + "sync" + "time" + + "go.uber.org/zap" +) + +// Syncer periodically polls the sync_logs table and dispatches due +// sync/prune tasks to a fixed-size worker pool. +type Syncer struct { + id string + maxConcurrency int + pollInterval time.Duration // how often each worker queries for due tasks + + ctx context.Context + cancel context.CancelFunc + + workerWg sync.WaitGroup + + // ShutdownCh is closed when Stop() completes. + ShutdownCh chan struct{} +} + +// NewSyncer creates a syncer with the given concurrency and poll interval. +func NewSyncer(maxConcurrency int, pollInterval time.Duration) *Syncer { + ctx, cancel := context.WithCancel(context.Background()) + return &Syncer{ + id: common.GenerateUUID(), + maxConcurrency: maxConcurrency, + pollInterval: pollInterval, + ctx: ctx, + cancel: cancel, + ShutdownCh: make(chan struct{}), + } +} + +// Start launches maxConcurrency worker goroutines. +func (s *Syncer) Start() error { + common.Info(fmt.Sprintf("Syncer %s starting with %d workers (poll every %v)", + s.id, s.maxConcurrency, s.pollInterval)) + + for i := 0; i < s.maxConcurrency; i++ { + s.workerWg.Add(1) + go s.workerLoop(i) + } + return nil +} + +// Stop cancels all workers and waits for them to finish. +func (s *Syncer) Stop() { + common.Info(fmt.Sprintf("Stopping syncer %s", s.id)) + s.cancel() + s.workerWg.Wait() + close(s.ShutdownCh) + common.Info(fmt.Sprintf("Syncer %s stopped", s.id)) +} + +func (s *Syncer) ID() string { + return s.id +} + +// workerLoop periodically polls the DB for due tasks until ctx is cancelled. +func (s *Syncer) workerLoop(workerID int) { + defer s.workerWg.Done() + common.Debug(fmt.Sprintf("Syncer worker %d started", workerID)) + + ticker := time.NewTicker(s.pollInterval) + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + common.Debug(fmt.Sprintf("Syncer worker %d exiting (ctx cancelled)", workerID)) + return + case <-ticker.C: + s.pollAndExecute(workerID) + } + } +} + +// pollAndExecute queries due sync & prune tasks, picks one, and runs it. +func (s *Syncer) pollAndExecute(workerID int) { + common.Info(fmt.Sprintf("Syncer worker %d polling for due tasks", workerID)) +} + +// executeSyncTask runs a sync task. +func (s *Syncer) executeSyncTask(task *entity.SyncLogs) { + common.Info("Executing sync task", + zap.String("task_id", task.ID), + zap.String("connector_id", task.ConnectorID), + zap.String("kb_id", task.KbID)) + // TODO: implement actual data-source-specific sync logic. + // For now, mark done. + s.markTaskDone(task.ID, task.ConnectorID) +} + +// executePruneTask runs a prune (delete stale docs) task. +func (s *Syncer) executePruneTask(task *entity.SyncLogs) { + common.Info("Executing prune task", + zap.String("task_id", task.ID), + zap.String("connector_id", task.ConnectorID), + zap.String("kb_id", task.KbID)) + // TODO: implement actual prune logic. + s.markTaskDone(task.ID, task.ConnectorID) +} + +// markTaskDone updates task and connector status to DONE. +func (s *Syncer) markTaskDone(taskID, connectorID string) { + db := dao.GetDB() + now := time.Now().Local() + + db.Model(&entity.SyncLogs{}).Where("id = ?", taskID).Updates(map[string]interface{}{ + "status": string(entity.TaskStatusDone), + "update_time": now, + }) + db.Model(&entity.Connector{}).Where("id = ?", connectorID).Updates(map[string]interface{}{ + "status": string(entity.TaskStatusDone), + "update_time": now, + }) +}