From 9ce766192ff0690f6c393f4280aaae4c294ce22b Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Fri, 20 Mar 2026 13:15:41 +0800 Subject: [PATCH] Init storage engine (#13707) ### What problem does this PR solve? 1. Init Minio / S3 / OSS 2. Fix minio / s3 / oss config ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai --- cmd/server_main.go | 5 ++ internal/server/config.go | 92 +++++++++++++++----- internal/storage/minio.go | 5 +- internal/storage/oss.go | 9 +- internal/storage/s3.go | 15 ++-- internal/storage/storage_factory.go | 127 ++++++---------------------- 6 files changed, 115 insertions(+), 138 deletions(-) diff --git a/cmd/server_main.go b/cmd/server_main.go index baaf72ad39..c22f19670f 100644 --- a/cmd/server_main.go +++ b/cmd/server_main.go @@ -10,6 +10,7 @@ import ( "ragflow/internal/common" "ragflow/internal/server" "ragflow/internal/server/local" + "ragflow/internal/storage" "ragflow/internal/utility" "strings" "syscall" @@ -118,6 +119,10 @@ func main() { } defer cache.Close() + if err := storage.InitStorageFactory(); err != nil { + logger.Fatal("Failed to initialize storage factory", zap.Error(err)) + } + // Initialize server variables (runtime variables that can change during operation) // This must be done after Cache is initialized if err := server.InitVariables(cache.Get()); err != nil { diff --git a/internal/server/config.go b/internal/server/config.go index deb4ce10cb..8892761ed5 100644 --- a/internal/server/config.go +++ b/internal/server/config.go @@ -159,8 +159,8 @@ const ( // OSSConfig holds Aliyun OSS storage configuration // OSS is compatible with S3 API type OSSConfig struct { - AccessKeyID string `mapstructure:"access_key"` // OSS Access Key ID - SecretAccessKey string `mapstructure:"secret_key"` // OSS Secret Access Key + AccessKey string `mapstructure:"access_key"` // OSS Access Key ID + SecretKey string `mapstructure:"secret_key"` // OSS Secret Access Key EndpointURL string `mapstructure:"endpoint_url"` // OSS Endpoint (e.g., "https://oss-cn-hangzhou.aliyuncs.com") Region string `mapstructure:"region"` // Region (e.g., "cn-hangzhou") Bucket string `mapstructure:"bucket"` // Default bucket (optional) @@ -182,10 +182,10 @@ type MinioConfig struct { // S3Config holds AWS S3 storage configuration type S3Config struct { - AccessKeyID string `mapstructure:"access_key"` // AWS Access Key ID - SecretAccessKey string `mapstructure:"secret_key"` // AWS Secret Access Key - SessionToken string `mapstructure:"session_token"` // AWS Session Token (optional) + AccessKey string `mapstructure:"access_key"` // AWS Access Key ID + SecretKey string `mapstructure:"secret_key"` // AWS Secret Access Key Region string `mapstructure:"region_name"` // AWS Region + SessionToken string `mapstructure:"session_token"` // AWS Session Token (optional) EndpointURL string `mapstructure:"endpoint_url"` // Custom endpoint (optional) SignatureVersion string `mapstructure:"signature_version"` // Signature version AddressingStyle string `mapstructure:"addressing_style"` // Addressing style @@ -425,17 +425,22 @@ func FromEnvironments() error { } // Storage - //storageType := strings.ToLower(os.Getenv("STORAGE_IMPL")) - //switch storageType { - //case "minio": - // globalConfig.StorageEngine.Type = StorageMinio - //case "s3": - // globalConfig.StorageEngine.Type = StorageS3 - //case "oss": - // globalConfig.StorageEngine.Type = StorageOSS - //default: - // return fmt.Errorf("invalid storage type: %s", storageType) - //} + storageType := strings.ToLower(os.Getenv("STORAGE_IMPL")) + switch storageType { + case "minio": + globalConfig.StorageEngine.Type = StorageMinio + case "s3": + globalConfig.StorageEngine.Type = StorageS3 + case "oss": + globalConfig.StorageEngine.Type = StorageOSS + case "": + // Default + if globalConfig.StorageEngine.Type == "" { + globalConfig.StorageEngine.Type = StorageMinio + } + default: + return fmt.Errorf("invalid storage type: %s", storageType) + } // Language if globalConfig.Language == "" { @@ -566,11 +571,6 @@ func FromConfigFile(configPath string) error { // Map doc_engine section to DocEngineConfig if globalConfig != nil && globalConfig.DocEngine.Type == "" { - // Use DOC_ENGINE env var if set - //if docEngine != "" { - // globalConfig.DocEngine.Type = EngineType(docEngine) - //} - // Try to map from doc_engine section (overrides env var if present) if v.IsSet("doc_engine") { docEngineConfig := v.Sub("doc_engine") if docEngineConfig != nil { @@ -610,6 +610,56 @@ func FromConfigFile(configPath string) error { } } + if globalConfig != nil && globalConfig.StorageEngine.Type == "" { + // Also check legacy es section for backward compatibility + if v.IsSet("minio") { + minioConfig := v.Sub("minio") + if minioConfig != nil { + if globalConfig.StorageEngine.Minio == nil { + globalConfig.StorageEngine.Minio = &MinioConfig{ + Host: minioConfig.GetString("host"), + User: minioConfig.GetString("user"), + Password: minioConfig.GetString("password"), + Secure: minioConfig.GetBool("secure"), + PrefixPath: minioConfig.GetString("prefix_path"), + Verify: minioConfig.GetBool("verify"), + Bucket: minioConfig.GetString("bucket"), + } + } + } + } + + if v.IsSet("s3") { + s3Config := v.Sub("s3") + if s3Config != nil { + if globalConfig.StorageEngine.S3 == nil { + globalConfig.StorageEngine.S3 = &S3Config{ + AccessKey: s3Config.GetString("access_key"), + SecretKey: s3Config.GetString("secret_key"), + Region: s3Config.GetString("region"), + } + } + } + } + + if v.IsSet("oss") { + ossConfig := v.Sub("oss") + if ossConfig != nil { + if globalConfig.StorageEngine.OSS == nil { + globalConfig.StorageEngine.OSS = &OSSConfig{ + AccessKey: ossConfig.GetString("access_key"), + SecretKey: ossConfig.GetString("secret_key"), + EndpointURL: ossConfig.GetString("endpoint_url"), + Region: ossConfig.GetString("region"), + Bucket: ossConfig.GetString("bucket"), + SignatureVersion: ossConfig.GetString("signature_version"), + AddressingStyle: ossConfig.GetString("addressing_style"), + } + } + } + } + } + // Map user_default_llm section to UserDefaultLLMConfig if v.IsSet("user_default_llm") { userDefaultLLMConfig := v.Sub("user_default_llm") diff --git a/internal/storage/minio.go b/internal/storage/minio.go index 7c0c02746e..496f56c9d0 100644 --- a/internal/storage/minio.go +++ b/internal/storage/minio.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "fmt" "net/http" + "ragflow/internal/server" "time" "github.com/minio/minio-go/v7" @@ -34,11 +35,11 @@ type MinioStorage struct { client *minio.Client bucket string prefixPath string - config *MinioConfig + config *server.MinioConfig } // NewMinioStorage creates a new MinIO storage instance -func NewMinioStorage(config *MinioConfig) (*MinioStorage, error) { +func NewMinioStorage(config *server.MinioConfig) (*MinioStorage, error) { storage := &MinioStorage{ bucket: config.Bucket, prefixPath: config.PrefixPath, diff --git a/internal/storage/oss.go b/internal/storage/oss.go index a61baa6cb8..11146cb1ef 100644 --- a/internal/storage/oss.go +++ b/internal/storage/oss.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "ragflow/internal/server" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -37,11 +38,11 @@ type OSSStorage struct { client *s3.Client bucket string prefixPath string - config *OSSConfig + config *server.OSSConfig } // NewOSSStorage creates a new OSS storage instance -func NewOSSStorage(config *OSSConfig) (*OSSStorage, error) { +func NewOSSStorage(config *server.OSSConfig) (*OSSStorage, error) { storage := &OSSStorage{ bucket: config.Bucket, prefixPath: config.PrefixPath, @@ -60,8 +61,8 @@ func (o *OSSStorage) connect() error { // Create static credentials creds := credentials.NewStaticCredentialsProvider( - o.config.AccessKeyID, - o.config.SecretAccessKey, + o.config.AccessKey, + o.config.SecretKey, "", ) diff --git a/internal/storage/s3.go b/internal/storage/s3.go index 6533e9a4cd..5c3addfc15 100644 --- a/internal/storage/s3.go +++ b/internal/storage/s3.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "ragflow/internal/server" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -36,15 +37,13 @@ type S3Storage struct { client *s3.Client bucket string prefixPath string - config *S3Config + config *server.S3Config } // NewS3Storage creates a new S3 storage instance -func NewS3Storage(config *S3Config) (*S3Storage, error) { +func NewS3Storage(config *server.S3Config) (*S3Storage, error) { storage := &S3Storage{ - bucket: config.Bucket, - prefixPath: config.PrefixPath, - config: config, + config: config, } if err := storage.connect(); err != nil { @@ -65,10 +64,10 @@ func (s *S3Storage) connect() error { } // Configure credentials if provided - if s.config.AccessKeyID != "" && s.config.SecretAccessKey != "" { + if s.config.AccessKey != "" && s.config.SecretKey != "" { creds := credentials.NewStaticCredentialsProvider( - s.config.AccessKeyID, - s.config.SecretAccessKey, + s.config.AccessKey, + s.config.SecretKey, s.config.SessionToken, ) opts = append(opts, config.WithCredentialsProvider(creds)) diff --git a/internal/storage/storage_factory.go b/internal/storage/storage_factory.go index d63ef5017e..3ee45606df 100644 --- a/internal/storage/storage_factory.go +++ b/internal/storage/storage_factory.go @@ -18,12 +18,9 @@ package storage import ( "fmt" - "os" + "ragflow/internal/logger" "ragflow/internal/server" "sync" - - "github.com/spf13/viper" - "go.uber.org/zap" ) var ( @@ -48,81 +45,37 @@ func GetStorageFactory() *StorageFactory { } // InitStorageFactory initializes the storage factory with configuration -func InitStorageFactory(v *viper.Viper) error { +func InitStorageFactory() error { factory := GetStorageFactory() - // Get storage type from environment or config - storageType := os.Getenv("STORAGE_IMPL") - if storageType == "" { - storageType = v.GetString("storage_type") - } - if storageType == "" { - storageType = "MINIO" // Default storage type - } - - storageConfig := &server.StorageConfig{} - if err := v.UnmarshalKey("storage", storageConfig); err != nil { - return fmt.Errorf("failed to unmarshal storage config: %w", err) - } - storageConfig.StorageType = storageType - - factory.config = storageConfig - + globalConfig := server.GetConfig() + factory.config = &globalConfig.StorageEngine // Initialize storage based on type - if err := factory.initStorage(storageType, v); err != nil { + if err := factory.initStorage(); err != nil { return err } - zap.L().Info("Storage factory initialized", - zap.String("storage_type", storageType), - ) + logger.Info(fmt.Sprintf("Storage initialized: %s", factory.config.Type)) return nil } // initStorage initializes the specific storage implementation -func (f *StorageFactory) initStorage(storageType string, v *viper.Viper) error { - switch storageType { - case "MINIO": - return f.initMinio(v) - case "AWS_S3": - return f.initS3(v) - case "OSS": - return f.initOSS(v) +func (f *StorageFactory) initStorage() error { + switch f.config.Type { + case "minio": + return f.initMinio(f.config.Minio) + case "s3": + return f.initS3(f.config.S3) + case "oss": + return f.initOSS(f.config.OSS) default: - return fmt.Errorf("unsupported storage type: %s", storageType) + return fmt.Errorf("unsupported storage type: %s", f.config.Type) } } -func (f *StorageFactory) initMinio(v *viper.Viper) error { - config := &server.MinioConfig{} - - // Try to load from minio section first - if v.IsSet("minio") { - minioConfig := v.Sub("minio") - if minioConfig != nil { - config.Host = minioConfig.GetString("host") - config.User = minioConfig.GetString("user") - config.Password = minioConfig.GetString("password") - config.Secure = minioConfig.GetBool("secure") - config.Verify = minioConfig.GetBool("verify") - config.Bucket = minioConfig.GetString("bucket") - config.PrefixPath = minioConfig.GetString("prefix_path") - } - } - - // Apply defaults - if config.Host == "" { - config.Host = "localhost:9000" - } - if config.User == "" { - config.User = "minioadmin" - } - if config.Password == "" { - config.Password = "minioadmin" - } - - storage, err := NewMinioStorage(config) +func (f *StorageFactory) initMinio(minioConfig *server.MinioConfig) error { + storage, err := NewMinioStorage(minioConfig) if err != nil { return fmt.Errorf("failed to create MinIO storage: %w", err) } @@ -131,30 +84,13 @@ func (f *StorageFactory) initMinio(v *viper.Viper) error { defer f.mu.Unlock() f.storageType = StorageMinio f.storage = storage - f.config.Minio = config + f.config.Minio = minioConfig return nil } -func (f *StorageFactory) initS3(v *viper.Viper) error { - config := &server.S3Config{} - - if v.IsSet("s3") { - s3Config := v.Sub("s3") - if s3Config != nil { - config.AccessKeyID = s3Config.GetString("access_key") - config.SecretAccessKey = s3Config.GetString("secret_key") - config.SessionToken = s3Config.GetString("session_token") - config.Region = s3Config.GetString("region_name") - config.EndpointURL = s3Config.GetString("endpoint_url") - config.SignatureVersion = s3Config.GetString("signature_version") - config.AddressingStyle = s3Config.GetString("addressing_style") - config.Bucket = s3Config.GetString("bucket") - config.PrefixPath = s3Config.GetString("prefix_path") - } - } - - storage, err := NewS3Storage(config) +func (f *StorageFactory) initS3(s3Config *server.S3Config) error { + storage, err := NewS3Storage(s3Config) if err != nil { return fmt.Errorf("failed to create S3 storage: %w", err) } @@ -163,29 +99,14 @@ func (f *StorageFactory) initS3(v *viper.Viper) error { defer f.mu.Unlock() f.storageType = StorageAWSS3 f.storage = storage - f.config.S3 = config + f.config.S3 = s3Config return nil } -func (f *StorageFactory) initOSS(v *viper.Viper) error { - config := &server.OSSConfig{} +func (f *StorageFactory) initOSS(ossConfig *server.OSSConfig) error { - if v.IsSet("oss") { - ossConfig := v.Sub("oss") - if ossConfig != nil { - config.AccessKeyID = ossConfig.GetString("access_key") - config.SecretAccessKey = ossConfig.GetString("secret_key") - config.EndpointURL = ossConfig.GetString("endpoint_url") - config.Region = ossConfig.GetString("region") - config.Bucket = ossConfig.GetString("bucket") - config.PrefixPath = ossConfig.GetString("prefix_path") - config.SignatureVersion = ossConfig.GetString("signature_version") - config.AddressingStyle = ossConfig.GetString("addressing_style") - } - } - - storage, err := NewOSSStorage(config) + storage, err := NewOSSStorage(ossConfig) if err != nil { return fmt.Errorf("failed to create OSS storage: %w", err) } @@ -194,7 +115,7 @@ func (f *StorageFactory) initOSS(v *viper.Viper) error { defer f.mu.Unlock() f.storageType = StorageOSS f.storage = storage - f.config.OSS = config + f.config.OSS = ossConfig return nil }