Go: file syncer service framework (#16579)

### Summary

./ragflow_main --syncer to start file syncer


config yaml file has following config
```
file_syncer:
  max_concurrent_syncs: 4 # concurrent file sync threads
  sync_interval: 3 # check interval

```

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai
2026-07-03 11:14:02 +08:00
committed by GitHub
parent 62f94cd59b
commit 1aa8abe373
7 changed files with 262 additions and 14 deletions

View File

@@ -37,6 +37,7 @@ import (
"ragflow/internal/service/chunk"
"ragflow/internal/service/nlp"
"ragflow/internal/storage"
"ragflow/internal/syncer"
"ragflow/internal/tokenizer"
"strconv"
"strings"
@@ -56,15 +57,15 @@ import (
)
type serverArgs struct {
mode *string // admin | api | ingestor
mode *string // admin | api | ingestor | syncer
helpFlag bool
versionFlag bool
debugLog bool
configPath *string // Used by admin, api; user defined config path
initSuperUser bool // Used by admin;
port *int // Used by admin, api
adminHost *string // Used by api and ingestor for heartbeat
adminPort *int // Used by api and ingestor for heartbeat, "ip:port"
adminHost *string // Used by api, ingestor, syncer for heartbeat
adminPort *int // Used by api, ingestor, syncer for heartbeat, "ip:port"
name *string // server name
}
@@ -85,6 +86,9 @@ func parseArgs() (*serverArgs, error) {
case "--api":
serverMode = "api"
args.mode = &serverMode
case "--syncer":
serverMode = "syncer"
args.mode = &serverMode
case "-h", "--help":
args.helpFlag = true
case "-v", "--version":
@@ -188,6 +192,16 @@ func printHelp(args *serverArgs) {
fmt.Fprintf(os.Stderr, " -v, --version \t\tPrint version information and exit\n")
fmt.Fprintf(os.Stderr, " --debug \t\tEnable debug-level logging\n")
fmt.Fprintf(os.Stderr, " -h, --help \t\tShow this help message and exit\n")
case *args.mode == "syncer":
fmt.Fprintf(os.Stderr, "Usage: %s --syncer [OPTIONS]\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, "RAGFlow Sync Service - Sync files from source to RAGFlow\n\n")
fmt.Fprintf(os.Stderr, "Options:\n")
fmt.Fprintf(os.Stderr, " -f --config string\tPath to config file\n")
fmt.Fprintf(os.Stderr, " --name string\t\t\tSync service server name (default: \"default_syncer\")\n")
fmt.Fprintf(os.Stderr, " --admin-host string\tAdmin server host:port (overrides config file)\n")
fmt.Fprintf(os.Stderr, " -v, --version \t\tPrint version information and exit\n")
fmt.Fprintf(os.Stderr, " --debug \t\tEnable debug-level logging\n")
fmt.Fprintf(os.Stderr, " -h, --help \t\tShow this help message and exit\n")
}
}
@@ -273,6 +287,11 @@ func main() {
uuid := common.GenerateUUID()
serverName = fmt.Sprintf("ingestor_server_%s", uuid)
}
case "syncer":
if serverName == "" {
uuid := common.GenerateUUID()
serverName = fmt.Sprintf("syncer_server_%s", uuid)
}
default:
common.Error("invalid server mode", errors.New(*arguments.mode))
os.Exit(1)
@@ -365,6 +384,11 @@ func main() {
fmt.Printf("Failed to start ingestion worker: %v\n", err)
os.Exit(1)
}
case "syncer":
if err = runSyncer(arguments); err != nil {
fmt.Printf("Failed to start syncer: %v\n", err)
os.Exit(1)
}
default:
fmt.Printf("Invalid server mode: %s\n", *arguments.mode)
os.Exit(1)
@@ -403,9 +427,6 @@ func runAdmin(args *serverArgs) error {
Handler: ginEngine,
}
// Print all configuration settings
server.PrintAll()
// Print RAGFlow Admin logo
common.Info("" +
"\n ____ ___ ______________ ___ __ _ \n" +
@@ -461,8 +482,6 @@ func runIngestor(args *serverArgs) error {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2)
// Print all configuration settings
server.PrintAll()
common.Info("\n ____ __ _\n" +
" / _/___ ____ ____ _____/ /_(_)___ ____ ________ ______ _____ _____\n" +
" / // __ \\/ __ `/ _ \\/ ___/ __/ / __ \\/ __ \\ / ___/ _ \\/ ___/ | / / _ \\/ ___/\n" +
@@ -523,6 +542,81 @@ func runIngestor(args *serverArgs) error {
return nil
}
func runSyncer(args *serverArgs) error {
config := server.GetConfig()
fileSyncer := syncer.NewSyncer(config.FileSyncer.MaxConcurrentSyncs, time.Duration(config.FileSyncer.SyncInterval)*time.Second)
go func() {
err := fileSyncer.Start()
if err != nil {
common.Error("Failed to initialize file syncer", err)
return
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2)
common.Info("\n _______ __ _____\n" +
" / ____(_) /__ / ___/__ ______ ________ _____\n" +
" / /_ / / / _ \\ \\__ \\/ / / / __ \\/ ___/ _ \\/ ___/\n" +
" / __/ / / / __/ ___/ / /_/ / / / / /__/ __/ /\n" +
" /_/ /_/_/\\___/ /____/\\__, /_/ /_/\\___/\\___/_/\n" +
" /____/ \n")
// Print RAGFlow version
common.Info(fmt.Sprintf("RAGFlow file syncer service version: %s", utility.GetRAGFlowVersion()))
// Get local IP address for heartbeat reporting
localIP, err := utility.GetLocalIP()
if err != nil {
common.Fatal("fail to get local ip address")
}
// Initialize and start heartbeat reporter to admin server
service.AdminServiceClient = service.NewAdminClient(
common.Logger,
common.ServerTypeFileSyncer,
fmt.Sprintf("syncer-%s", fileSyncer.ID()),
localIP,
-1,
)
if err = service.AdminServiceClient.InitHTTPClient(); err != nil {
common.Warn("Failed to initialize heartbeat service", zap.Error(err))
} else {
// Start heartbeat reporter with 30 seconds interval
heartbeatReporter := utility.NewScheduledTask("Heartbeat reporter", 3*time.Second, func() {
if err = service.AdminServiceClient.SendHeartbeat(); err == nil {
local.SetAdminStatus(0, "")
} else {
local.SetAdminStatus(1, err.Error())
//logger.Warn(fmt.Sprintf(err.Error()))
}
})
heartbeatReporter.Start()
defer heartbeatReporter.Stop()
}
// Wait for either an OS signal or a shutdown command from the admin
select {
case sig := <-quit:
common.Info("Received signal", zap.String("signal", sig.String()))
common.Info(fmt.Sprintf("Shutting down RAGFlow file syncer %s ...", *args.name))
case <-fileSyncer.ShutdownCh:
common.Info(fmt.Sprintf("Received shutdown command from admin, stopping file syncer %s ...", *args.name))
}
// Create context with timeout for graceful shutdown
_, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
fileSyncer.Stop()
common.Info(fmt.Sprintf("File syncer %s shutdown complete", *args.name))
return nil
}
func runAPI(args *serverArgs) error {
// Initialize admin status (default: unavailable=1)
local.InitAdminStatus(1, "admin server not connected")
@@ -599,7 +693,7 @@ func startServer(config *server.Config) {
documentHandler := handler.NewDocumentHandler(documentService, datasetsService)
datasetsHandler := handler.NewDatasetsHandler(datasetsService, metadataService)
systemHandler := handler.NewSystemHandler(systemService)
datasethandler := handler.NewKnowledgebaseHandler(datasetService, userService, documentService)
datasetHandler := handler.NewKnowledgebaseHandler(datasetService, userService, documentService)
chunkHandler := handler.NewChunkHandler(chunkService, userService)
llmHandler := handler.NewLLMHandler(llmService, userService)
chatHandler := handler.NewChatHandler(chatService, userService)
@@ -713,7 +807,7 @@ func startServer(config *server.Config) {
documentHandler,
datasetsHandler,
systemHandler,
datasethandler,
datasetHandler,
chunkHandler,
llmHandler,
chatHandler,

View File

@@ -53,6 +53,9 @@ nats:
port: 4222
task_executor:
message_queue_type: 'nats'
file_syncer:
max_concurrent_syncs: 4
sync_interval: 3
user_default_llm:
default_models:
embedding_model:

View File

@@ -65,6 +65,9 @@ nats:
port: ${NATS_PORT:-4222}
task_executor:
message_queue_type: 'nats'
file_syncer:
max_concurrent_syncs: 4
sync_interval: 3
user_default_llm:
default_models:
embedding_model:

View File

@@ -31,9 +31,9 @@ const (
type ServerType string
const (
ServerTypeAPI ServerType = "api_server" // API server
ServerTypeIngestion ServerType = "ingestor" // Ingestion server
ServerTypeDataCollector ServerType = "data_collector" // Data collection server
ServerTypeAPI ServerType = "api_server" // API server
ServerTypeIngestion ServerType = "ingestor" // Ingestion server
ServerTypeFileSyncer ServerType = "file_syncer" // File syncer server
)
type BaseMessage struct {

View File

@@ -209,7 +209,6 @@ func (r *Router) Setup(engine *gin.Engine) {
RegisterAgentbotRoutes(agentbotGroup, betaMW, r.botHandler)
}
// Public bot endpoints (authenticated with an SDK beta token, not a session)
apiBetaAuth.GET("/chatbots/:dialog_id/info", r.botHandler.ChatbotInfo)
apiBetaAuth.GET("/documents/:id/preview", r.documentHandler.GetDocumentPreview)
apiBetaAuth.GET("/documents/images/:image_id", r.documentHandler.GetDocumentImage)
apiBetaAuth.GET("/thumbnails", r.documentHandler.GetThumbnail)

View File

@@ -53,6 +53,7 @@ type Config struct {
DefaultSuperUser DefaultSuperUser `mapstructure:"default_super_user"`
Language string `mapstructure:"language"`
TaskExecutor TaskExecutorConfig `mapstructure:"task_executor"`
FileSyncer FileSyncerConfig `mapstructure:"file_syncer"`
}
// AdminConfig admin server configuration
@@ -76,6 +77,11 @@ type TaskExecutorConfig struct {
MessageQueueType string `mapstructure:"message_queue_type"`
}
type FileSyncerConfig struct {
MaxConcurrentSyncs int `mapstructure:"max_concurrent_syncs"`
SyncInterval int `mapstructure:"sync_interval"`
}
// UserDefaultLLMConfig user default LLM configuration
type UserDefaultLLMConfig struct {
DefaultModels DefaultModelsConfig `mapstructure:"default_models"`

143
internal/syncer/syncer.go Normal file
View File

@@ -0,0 +1,143 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package syncer
import (
"context"
"fmt"
"ragflow/internal/common"
"ragflow/internal/dao"
"ragflow/internal/entity"
"sync"
"time"
"go.uber.org/zap"
)
// Syncer periodically polls the sync_logs table and dispatches due
// sync/prune tasks to a fixed-size worker pool.
type Syncer struct {
id string
maxConcurrency int
pollInterval time.Duration // how often each worker queries for due tasks
ctx context.Context
cancel context.CancelFunc
workerWg sync.WaitGroup
// ShutdownCh is closed when Stop() completes.
ShutdownCh chan struct{}
}
// NewSyncer creates a syncer with the given concurrency and poll interval.
func NewSyncer(maxConcurrency int, pollInterval time.Duration) *Syncer {
ctx, cancel := context.WithCancel(context.Background())
return &Syncer{
id: common.GenerateUUID(),
maxConcurrency: maxConcurrency,
pollInterval: pollInterval,
ctx: ctx,
cancel: cancel,
ShutdownCh: make(chan struct{}),
}
}
// Start launches maxConcurrency worker goroutines.
func (s *Syncer) Start() error {
common.Info(fmt.Sprintf("Syncer %s starting with %d workers (poll every %v)",
s.id, s.maxConcurrency, s.pollInterval))
for i := 0; i < s.maxConcurrency; i++ {
s.workerWg.Add(1)
go s.workerLoop(i)
}
return nil
}
// Stop cancels all workers and waits for them to finish.
func (s *Syncer) Stop() {
common.Info(fmt.Sprintf("Stopping syncer %s", s.id))
s.cancel()
s.workerWg.Wait()
close(s.ShutdownCh)
common.Info(fmt.Sprintf("Syncer %s stopped", s.id))
}
func (s *Syncer) ID() string {
return s.id
}
// workerLoop periodically polls the DB for due tasks until ctx is cancelled.
func (s *Syncer) workerLoop(workerID int) {
defer s.workerWg.Done()
common.Debug(fmt.Sprintf("Syncer worker %d started", workerID))
ticker := time.NewTicker(s.pollInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
common.Debug(fmt.Sprintf("Syncer worker %d exiting (ctx cancelled)", workerID))
return
case <-ticker.C:
s.pollAndExecute(workerID)
}
}
}
// pollAndExecute queries due sync & prune tasks, picks one, and runs it.
func (s *Syncer) pollAndExecute(workerID int) {
common.Info(fmt.Sprintf("Syncer worker %d polling for due tasks", workerID))
}
// executeSyncTask runs a sync task.
func (s *Syncer) executeSyncTask(task *entity.SyncLogs) {
common.Info("Executing sync task",
zap.String("task_id", task.ID),
zap.String("connector_id", task.ConnectorID),
zap.String("kb_id", task.KbID))
// TODO: implement actual data-source-specific sync logic.
// For now, mark done.
s.markTaskDone(task.ID, task.ConnectorID)
}
// executePruneTask runs a prune (delete stale docs) task.
func (s *Syncer) executePruneTask(task *entity.SyncLogs) {
common.Info("Executing prune task",
zap.String("task_id", task.ID),
zap.String("connector_id", task.ConnectorID),
zap.String("kb_id", task.KbID))
// TODO: implement actual prune logic.
s.markTaskDone(task.ID, task.ConnectorID)
}
// markTaskDone updates task and connector status to DONE.
func (s *Syncer) markTaskDone(taskID, connectorID string) {
db := dao.GetDB()
now := time.Now().Local()
db.Model(&entity.SyncLogs{}).Where("id = ?", taskID).Updates(map[string]interface{}{
"status": string(entity.TaskStatusDone),
"update_time": now,
})
db.Model(&entity.Connector{}).Where("id = ?", connectorID).Updates(map[string]interface{}{
"status": string(entity.TaskStatusDone),
"update_time": now,
})
}