mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
### What problem does this PR solve?
| # | Method | Endpoint | Description | Git Equivalent |
|---|--------|----------|-------------|----------------|
| 1 | `POST` | `/api/v1/{prefix}/{folder_id}/commits` | Create a
snapshot commit with file changes (add/modify/delete/rename) | `git add`
+ `git commit` |
| 2 | `GET` | `/api/v1/{prefix}/{folder_id}/commits` | List commit
history (paginated) | `git log` |
| 3 | `GET` | `/api/v1/{prefix}/{folder_id}/commits/{commit_id}` | Get
commit detail with file changes | `git show` |
| 4 | `GET` | `/api/v1/{prefix}/{folder_id}/commits/{commit_id}/files` |
List file changes in a commit | `git show --name-status` |
| 5 | `GET` |
`/api/v1/{prefix}/{folder_id}/commits/diff?from=...&to=...` | Compare
two commits and return differences | `git diff` |
| 6 | `GET` | `/api/v1/{prefix}/{folder_id}/changes` | Get uncommitted
changes (add/modify/delete) | `git status` |
| 7 | `GET` | `/api/v1/{prefix}/{folder_id}/commits/{commit_id}/tree` |
Get the folder tree snapshot at commit time | `git ls-tree` |
| 8 | `GET` |
`/api/v1/{prefix}/{folder_id}/commits/{commit_id}/files/{file_id}/content`
| Get a file's content as it existed in a specific commit | `git show
HEAD:file` |
| 9 | `GET` | `/api/v1/{prefix}/{file_id}/versions` | Get version
history for a specific file across all commits | `git log -- file` |
Where `{prefix}/{id}` can be:
- `folders/{folder_id}` — direct folder access
- `workspaces/{workspace_id}` — alias of `folders/{folder_id}`
- `datasets/{dataset_id}` — resolves to the dataset's folder
- `memories/{memory_id}` — resolves to the memory's folder
- `skills/{skill_id}` — resolves to the skill's folder
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
- [x] Documentation Update
354 lines
12 KiB
Go
354 lines
12 KiB
Go
//go:build ignore
|
|
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"ragflow/internal/common"
|
|
"ragflow/internal/server"
|
|
"ragflow/internal/server/local"
|
|
"ragflow/internal/storage"
|
|
"ragflow/internal/utility"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"go.uber.org/zap"
|
|
|
|
"ragflow/internal/agent/runtime"
|
|
"ragflow/internal/cache"
|
|
"ragflow/internal/dao"
|
|
"ragflow/internal/engine"
|
|
"ragflow/internal/handler"
|
|
"ragflow/internal/router"
|
|
"ragflow/internal/service"
|
|
"ragflow/internal/service/chunk"
|
|
"ragflow/internal/service/nlp"
|
|
"ragflow/internal/tokenizer"
|
|
)
|
|
|
|
func printHelp() {
|
|
fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS]\n\n", os.Args[0])
|
|
fmt.Fprintf(os.Stderr, "RAGFlow Server - Open-source RAG engine based on deep document understanding\n\n")
|
|
fmt.Fprintf(os.Stderr, "Options:\n")
|
|
fmt.Fprintf(os.Stderr, " -p, --port int\tServer port (overrides config file)\n")
|
|
fmt.Fprintf(os.Stderr, " -h, --help \tShow this help message and exit\n")
|
|
fmt.Fprintf(os.Stderr, "\nExamples:\n")
|
|
fmt.Fprintf(os.Stderr, " %s # Start server with config file port\n", os.Args[0])
|
|
fmt.Fprintf(os.Stderr, " %s -p 8080 # Start server on port 8080\n", os.Args[0])
|
|
fmt.Fprintf(os.Stderr, " %s --port 8080 # Start server on port 8080\n", os.Args[0])
|
|
}
|
|
|
|
func main() {
|
|
// Parse command line flags
|
|
var portFlag int
|
|
flag.IntVar(&portFlag, "port", 0, "Server port (overrides config file)")
|
|
flag.IntVar(&portFlag, "p", 0, "Server port (shorthand, overrides config file)")
|
|
|
|
// Custom help message
|
|
flag.Usage = printHelp
|
|
|
|
flag.Parse()
|
|
|
|
// Initialize logger with default level
|
|
// logger.Init("info"); // set debug log level
|
|
if err := common.Init("info", "server_main.log"); err != nil {
|
|
panic(fmt.Sprintf("Failed to initialize logger: %v", err))
|
|
}
|
|
|
|
// Initialize configuration
|
|
if err := server.Init(""); err != nil {
|
|
common.Fatal("Failed to initialize config", zap.Error(err))
|
|
}
|
|
|
|
// Override port with command line argument if provided
|
|
config := server.GetConfig()
|
|
if portFlag > 0 {
|
|
config.Server.Port = portFlag
|
|
common.Info("Port overridden by command line argument", zap.Int("port", portFlag))
|
|
}
|
|
|
|
if config.Server.Port == 0 {
|
|
common.Fatal("Server port is not configured. Please specify via --port flag or config file.")
|
|
}
|
|
|
|
// Reinitialize logger with configured level if different
|
|
level := config.Log.Level
|
|
if level == "" {
|
|
level = "info"
|
|
}
|
|
if err := common.Init(level, "server_main.log"); err != nil {
|
|
common.Error("Failed to reinitialize logger", err)
|
|
}
|
|
server.SetLogger(common.Logger)
|
|
if config.Log.Level == "" {
|
|
config.Log.Level = common.GetLevel()
|
|
}
|
|
|
|
common.Info("Server mode", zap.String("mode", config.Server.Mode))
|
|
|
|
// Print all configuration settings
|
|
server.PrintAll()
|
|
|
|
// Initialize database
|
|
if err := dao.InitDB(); err != nil {
|
|
common.Fatal("Failed to initialize database", zap.Error(err))
|
|
}
|
|
|
|
// Initialize doc engine
|
|
if err := engine.Init(&config.DocEngine); err != nil {
|
|
common.Fatal("Failed to initialize doc engine", zap.Error(err))
|
|
}
|
|
defer engine.Close()
|
|
|
|
// Initialize Redis cache
|
|
if err := cache.Init(&config.Redis); err != nil {
|
|
common.Fatal("Failed to initialize Redis", zap.Error(err))
|
|
}
|
|
defer cache.Close()
|
|
|
|
if err := storage.InitStorageFactory(); err != nil {
|
|
common.Fatal("Failed to initialize storage factory", zap.Error(err))
|
|
}
|
|
|
|
if err := engine.InitMessageQueueEngine(config.TaskExecutor.MessageQueueType); err != nil {
|
|
common.Error("Failed to initialize message queue engine", err)
|
|
}
|
|
|
|
// Initialize server variables (runtime variables that can change during operation)
|
|
// This must be done after Cache is initialized
|
|
if err := server.InitVariables(cache.Get()); err != nil {
|
|
common.Warn("Failed to initialize server variables from Redis, using defaults", zap.String("error", err.Error()))
|
|
}
|
|
|
|
// Initialize admin status (default: unavailable=1)
|
|
local.InitAdminStatus(1, "admin server not connected")
|
|
|
|
// Initialize tokenizer (rag_analyzer)
|
|
dictPath := os.Getenv("RAGFLOW_DICT_PATH")
|
|
if dictPath == "" {
|
|
dictPath = "/usr/share/infinity/resource"
|
|
}
|
|
tokenizerCfg := &tokenizer.PoolConfig{
|
|
DictPath: dictPath,
|
|
}
|
|
if err := tokenizer.Init(tokenizerCfg); err != nil {
|
|
common.Fatal("Failed to initialize tokenizer", zap.Error(err))
|
|
}
|
|
defer tokenizer.Close()
|
|
|
|
// Initialize global QueryBuilder using tokenizer's DictPath
|
|
// This ensures the Synonym uses the same wordnet directory as tokenizer
|
|
if err := nlp.InitQueryBuilderFromTokenizer(tokenizerCfg.DictPath); err != nil {
|
|
common.Fatal("Failed to initialize query builder", zap.Error(err))
|
|
}
|
|
|
|
startServer(config)
|
|
|
|
common.Info("Server exited")
|
|
}
|
|
|
|
func startServer(config *server.Config) {
|
|
|
|
// Set Gin mode
|
|
if config.Server.Mode == "release" {
|
|
gin.SetMode(gin.ReleaseMode)
|
|
} else {
|
|
gin.SetMode(gin.DebugMode)
|
|
}
|
|
|
|
// Initialize service layer
|
|
userService := service.NewUserService()
|
|
documentService := service.NewDocumentService()
|
|
datasetsService := service.NewDatasetService()
|
|
knowledgebaseService := service.NewKnowledgebaseService()
|
|
metadataService := service.NewMetadataService()
|
|
chunkService := chunk.NewChunkService()
|
|
llmService := service.NewLLMService()
|
|
tenantService := service.NewTenantService()
|
|
chatService := service.NewChatService()
|
|
chatSessionService := service.NewChatSessionService()
|
|
systemService := service.NewSystemService()
|
|
connectorService := service.NewConnectorService()
|
|
searchService := service.NewSearchService()
|
|
fileService := service.NewFileService()
|
|
memoryService := service.NewMemoryService()
|
|
mcpService := service.NewMCPService()
|
|
modelProviderService := service.NewModelProviderService()
|
|
|
|
// Initialize doc engine for skill search
|
|
docEngine := engine.Get()
|
|
|
|
// Initialize handler layer
|
|
authHandler := handler.NewAuthHandler()
|
|
userHandler := handler.NewUserHandler(userService)
|
|
tenantHandler := handler.NewTenantHandler(tenantService, userService, knowledgebaseService)
|
|
documentHandler := handler.NewDocumentHandler(documentService, datasetsService)
|
|
datasetsHandler := handler.NewDatasetsHandler(datasetsService, metadataService)
|
|
systemHandler := handler.NewSystemHandler(systemService)
|
|
knowledgebaseHandler := handler.NewKnowledgebaseHandler(knowledgebaseService, userService, documentService)
|
|
chunkHandler := handler.NewChunkHandler(chunkService, userService)
|
|
llmHandler := handler.NewLLMHandler(llmService, userService)
|
|
chatHandler := handler.NewChatHandler(chatService, userService)
|
|
chatSessionHandler := handler.NewChatSessionHandler(chatSessionService, userService)
|
|
connectorHandler := handler.NewConnectorHandler(connectorService, userService)
|
|
searchHandler := handler.NewSearchHandler(searchService, userService)
|
|
fileHandler := handler.NewFileHandler(fileService, userService)
|
|
memoryHandler := handler.NewMemoryHandler(memoryService)
|
|
mcpHandler := handler.NewMCPHandler(mcpService)
|
|
skillSearchHandler := handler.NewSkillSearchHandler(docEngine)
|
|
providerHandler := handler.NewProviderHandler(userService, modelProviderService)
|
|
agentHandler := handler.NewAgentHandler(service.NewAgentService(), fileService)
|
|
searchBotLLM := &handler.SearchBotRealLLM{Svc: modelProviderService}
|
|
searchBotHandler := handler.NewSearchBotHandler(
|
|
searchService,
|
|
tenantService,
|
|
searchBotLLM,
|
|
chunkService,
|
|
)
|
|
searchBotHandler.SetStreamLLM(searchBotLLM)
|
|
searchBotHandler.SetAskService(service.NewAskService(chunkService, nil, 0, 0))
|
|
pluginHandler := handler.NewPluginHandler(service.NewPluginService())
|
|
modelHandler := handler.NewModelHandler(service.NewModelProviderService())
|
|
fileCommitHandler := handler.NewFileCommitHandler(service.NewFileCommitService())
|
|
|
|
// Dify retrieval handler
|
|
docDAO := dao.NewDocumentDAO()
|
|
retrievalService := nlp.NewRetrievalService(docEngine, docDAO)
|
|
difyRetrievalHandler := handler.NewDifyRetrievalHandler(
|
|
knowledgebaseService,
|
|
modelProviderService,
|
|
metadataService,
|
|
retrievalService,
|
|
docDAO,
|
|
docEngine,
|
|
)
|
|
|
|
// Phase 6 per-tenant canvas-runtime override. The selector is backed by
|
|
// the existing Redis client and the global logger. The handler is
|
|
// ALWAYS constructed, even when Redis is briefly unavailable at startup,
|
|
// so the POST /api/v1/admin/canvas-runtime/:tenant_id endpoint stays
|
|
// registered and returns the explicit ErrSelectorNotConfigured (HTTP 500)
|
|
// path until Redis recovers. The previous behaviour — skipping handler
|
|
// construction when rdb == nil — silently removed the route until the
|
|
// next process restart, so a transient Redis blip at boot stranded
|
|
// canary operators with a 404 they could not diagnose from the client
|
|
// side. Review follow-up: keep the route hot.
|
|
var adminRuntimeSelector *runtime.Selector
|
|
if rdb := cache.Get().GetClient(); rdb != nil {
|
|
adminRuntimeSelector = runtime.NewSelector(rdb, common.Logger)
|
|
}
|
|
adminRuntimeHandler := handler.NewAdminRuntimeHandler(adminRuntimeSelector)
|
|
|
|
// Initialize router
|
|
r := router.NewRouter(authHandler, userHandler, tenantHandler, documentHandler, datasetsHandler, systemHandler, knowledgebaseHandler, chunkHandler, llmHandler, chatHandler, chatSessionHandler, connectorHandler, searchHandler, fileHandler, memoryHandler, mcpHandler, skillSearchHandler, providerHandler, agentHandler, searchBotHandler, difyRetrievalHandler, pluginHandler, modelHandler, fileCommitHandler, adminRuntimeHandler)
|
|
|
|
// Create Gin engine
|
|
ginEngine := gin.New()
|
|
|
|
// Middleware
|
|
if config.Server.Mode == "debug" {
|
|
ginEngine.Use(gin.Logger())
|
|
}
|
|
ginEngine.Use(gin.Recovery())
|
|
|
|
// Setup routes
|
|
r.Setup(ginEngine)
|
|
|
|
// Create HTTP server with timeouts to prevent slow clients from blocking shutdown
|
|
addr := fmt.Sprintf(":%d", config.Server.Port)
|
|
srv := &http.Server{
|
|
Addr: addr,
|
|
Handler: ginEngine,
|
|
ReadHeaderTimeout: 10 * time.Second,
|
|
ReadTimeout: 60 * time.Second,
|
|
WriteTimeout: 120 * time.Second,
|
|
IdleTimeout: 120 * time.Second,
|
|
}
|
|
|
|
// Start server in a goroutine
|
|
go func() {
|
|
common.Info(
|
|
"\n ____ ___ ______ ______ __\n" +
|
|
" / __ \\ / | / ____// ____// /____ _ __\n" +
|
|
" / /_/ // /| | / / __ / /_ / // __ \\| | /| / /\n" +
|
|
" / _, _// ___ |/ /_/ // __/ / // /_/ /| |/ |/ /\n" +
|
|
" /_/ |_|/_/ |_|\\____//_/ /_/ \\____/ |__/|__/\n",
|
|
)
|
|
common.Info(fmt.Sprintf("RAGFlow Go Version: %s", utility.GetRAGFlowVersion()))
|
|
common.Info(fmt.Sprintf("Server starting on port: %d", config.Server.Port))
|
|
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
common.Fatal("Failed to start server", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
// Get local IP address for heartbeat reporting
|
|
localIP, err := utility.GetLocalIP()
|
|
if err != nil {
|
|
common.Fatal("fail to get local ip address")
|
|
}
|
|
|
|
// Initialize and start heartbeat reporter to admin server
|
|
service.AdminServiceClient = service.NewAdminClient(
|
|
common.Logger,
|
|
common.ServerTypeAPI,
|
|
fmt.Sprintf("ragflow-server-%d", config.Server.Port),
|
|
localIP,
|
|
config.Server.Port,
|
|
)
|
|
if err = service.AdminServiceClient.InitHTTPClient(); err != nil {
|
|
common.Warn("Failed to initialize heartbeat service", zap.Error(err))
|
|
} else {
|
|
// Start heartbeat reporter with 30 seconds interval
|
|
heartbeatReporter := utility.NewScheduledTask("Heartbeat reporter", 3*time.Second, func() {
|
|
if err = service.AdminServiceClient.SendHeartbeat(); err == nil {
|
|
local.SetAdminStatus(0, "")
|
|
} else {
|
|
local.SetAdminStatus(1, err.Error())
|
|
//logger.Warn(fmt.Sprintf(err.Error()))
|
|
}
|
|
})
|
|
heartbeatReporter.Start()
|
|
defer heartbeatReporter.Stop()
|
|
}
|
|
|
|
// Wait for interrupt signal to gracefully shutdown
|
|
quit := make(chan os.Signal, 1)
|
|
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2)
|
|
sig := <-quit
|
|
|
|
common.Info(fmt.Sprintf("Receives %s signal to shutdown server", strings.ToUpper(sig.String())))
|
|
common.Info("Shutting down server...")
|
|
|
|
// Create context with timeout for graceful shutdown
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Shutdown server
|
|
if err = srv.Shutdown(ctx); err != nil {
|
|
common.Fatal("Server forced to shutdown", zap.Error(err))
|
|
}
|
|
}
|