diff --git a/cmd/admin_server.go b/cmd/admin_server.go index 0870f52fcc..487b5d1a7b 100644 --- a/cmd/admin_server.go +++ b/cmd/admin_server.go @@ -74,7 +74,7 @@ func main() { } // Initialize logger - if err := common.Init("info", "admin_server.log"); err != nil { + if err := common.Init("info", common.FileOutput{Path: "admin_server.log"}); err != nil { panic("failed to initialize logger: " + err.Error()) } @@ -96,7 +96,17 @@ func main() { logLevel = "debug" } - if err := common.Init(logLevel, "admin_server.log"); err != nil { + fileOut := common.FileOutput{ + Path: "admin_server.log", + MaxSize: cfg.Log.MaxSize, + MaxBackups: cfg.Log.MaxBackups, + MaxAge: cfg.Log.MaxAge, + Compress: common.ResolveCompress(cfg.Log.Compress), + } + if cfg.Log.Path != "" { + fileOut.Path = cfg.Log.Path + } + if err := common.Init(logLevel, fileOut); err != nil { common.Error("Failed to reinitialize logger with configured level", err) } @@ -158,15 +168,8 @@ func main() { ginEngine := gin.New() // Middleware - if cfg.Server.Mode == "debug" { - ginEngine.Use(gin.Logger()) - } + ginEngine.Use(common.GinLogger()) ginEngine.Use(gin.Recovery()) - // Log request URL for every request - ginEngine.Use(func(c *gin.Context) { - common.Info("HTTP Request", zap.String("url", c.Request.URL.String()), zap.String("method", c.Request.Method)) - c.Next() - }) // Setup routes r.Setup(ginEngine) diff --git a/cmd/ingestor.go b/cmd/ingestor.go index e1bc2e2219..200ddfd76d 100644 --- a/cmd/ingestor.go +++ b/cmd/ingestor.go @@ -90,7 +90,7 @@ func main() { } // Initialize logger with default level - if err := common.Init("info", "ingestion_server.log"); err != nil { + if err := common.Init("info", common.FileOutput{Path: "ingestion_server.log"}); err != nil { panic(fmt.Sprintf("Failed to initialize logger: %v", err)) } @@ -123,7 +123,17 @@ func main() { level = "debug" } - if err := common.Init(level, "ingestion_server.log"); err != nil { + fileOut := common.FileOutput{ + Path: "ingestion_server.log", + MaxSize: config.Log.MaxSize, + MaxBackups: config.Log.MaxBackups, + MaxAge: config.Log.MaxAge, + Compress: common.ResolveCompress(config.Log.Compress), + } + if config.Log.Path != "" { + fileOut.Path = config.Log.Path + } + if err := common.Init(level, fileOut); err != nil { common.Error("Failed to reinitialize logger", err) } server.SetLogger(common.Logger) diff --git a/cmd/ragflow_cli.go b/cmd/ragflow_cli.go index 9f5f7fb961..be5c3dfa8d 100644 --- a/cmd/ragflow_cli.go +++ b/cmd/ragflow_cli.go @@ -1,4 +1,5 @@ //go:build ignore + // // Copyright 2026 The InfiniFlow Authors. All Rights Reserved. // @@ -45,7 +46,7 @@ func main() { logLevel = "info" } - if err = common.Init(logLevel, ""); err != nil { + if err = common.Init(logLevel, common.FileOutput{}); err != nil { fmt.Printf("Warning: Failed to initialize logger: %v\n", err) } diff --git a/cmd/server_main.go b/cmd/server_main.go index 03304b9bd0..82a746de1e 100644 --- a/cmd/server_main.go +++ b/cmd/server_main.go @@ -92,7 +92,7 @@ func main() { // Initialize logger with default level // logger.Init("info"); // set debug log level - if err := common.Init("info", "server_main.log"); err != nil { + if err := common.Init("info", common.FileOutput{Path: "server_main.log"}); err != nil { panic(fmt.Sprintf("Failed to initialize logger: %v", err)) } @@ -122,7 +122,17 @@ func main() { level = "debug" } - if err := common.Init(level, "server_main.log"); err != nil { + fileOut := common.FileOutput{ + Path: "server_main.log", + MaxSize: config.Log.MaxSize, + MaxBackups: config.Log.MaxBackups, + MaxAge: config.Log.MaxAge, + Compress: common.ResolveCompress(config.Log.Compress), + } + if config.Log.Path != "" { + fileOut.Path = config.Log.Path + } + if err := common.Init(level, fileOut); err != nil { common.Error("Failed to reinitialize logger", err) } server.SetLogger(common.Logger) @@ -317,9 +327,10 @@ func startServer(config *server.Config) { ginEngine := gin.New() // Middleware - if config.Server.Mode == "debug" { - ginEngine.Use(gin.Logger()) - } + // Note: common.GinLogger() is registered inside router.Setup so the + // HTTP request log captures every endpoint the router owns (including + // those registered by Setup itself). Registering it here would run + // it twice for those endpoints and double every access-log line. ginEngine.Use(gin.Recovery()) // Setup routes diff --git a/docker/service_conf.yaml.template b/docker/service_conf.yaml.template index 21bbfaca27..d7681c1e0a 100644 --- a/docker/service_conf.yaml.template +++ b/docker/service_conf.yaml.template @@ -177,3 +177,11 @@ user_default_llm: # secret_id: '${TENCENT_SECRET_ID}' # secret_key: '${TENCENT_SECRET_KEY}' # region: '${TENCENT_REGION}' +# log: +# level: info # debug | info | warn | error +# format: text # text | json (reserved; text only today) +# path: server_main.log # empty/missing = binary default (server_main.log / admin_server.log / ingestion_server.log) +# max_size: 100 # MB before rotation +# max_backups: 10 # retained rotated files +# max_age: 30 # days +# compress: true # gzip rotated files; default true. Set false to disable. diff --git a/go.mod b/go.mod index eb796c1d86..dabf9ec028 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( golang.org/x/term v0.43.0 google.golang.org/genai v1.54.0 google.golang.org/grpc v1.81.1 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v3 v3.0.1 gorm.io/driver/mysql v1.5.2 gorm.io/gorm v1.25.7 diff --git a/go.sum b/go.sum index 3b80e71a58..bf793615bb 100644 --- a/go.sum +++ b/go.sum @@ -703,6 +703,8 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy gopkg.in/ini.v1 v1.56.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/agent/canvas/compile.go b/internal/agent/canvas/compile.go index c290410c39..aff844ce63 100644 --- a/internal/agent/canvas/compile.go +++ b/internal/agent/canvas/compile.go @@ -11,10 +11,12 @@ package canvas import ( "context" "fmt" - "log" "strings" "github.com/cloudwego/eino/compose" + "go.uber.org/zap" + + "ragflow/internal/common" ) // CheckPointStore is the minimal interface Compile needs at compile time. @@ -115,7 +117,7 @@ func Compile(ctx context.Context, c *Canvas, opts ...CompileOption) (*CompiledCa } } if n > 0 { - log.Printf("canvas: Compile received Canvas with %d legacy LoopItem/IterationItem/Iteration nodes; this path bypassed dsl.NormalizeForCanvas — the fold step is not applied", n) + common.Info("canvas: Compile received Canvas with legacy LoopItem/IterationItem/Iteration nodes; this path bypassed dsl.NormalizeForCanvas — the fold step is not applied", zap.Int("n", n)) } } diff --git a/internal/agent/canvas/compile_test.go b/internal/agent/canvas/compile_test.go index c9fbbc8a74..f4e818f81c 100644 --- a/internal/agent/canvas/compile_test.go +++ b/internal/agent/canvas/compile_test.go @@ -18,29 +18,40 @@ package canvas import ( "bytes" "context" - "log" "strings" "testing" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "ragflow/internal/common" ) // TestCompile_LogsWhenLegacyNodesPresent exercises the // decoder-bypass guard in Compile: a Canvas that carries // LoopItem/IterationItem entries in `Components` (i.e. one that // never went through dsl.NormalizeForCanvas) must produce a -// visible stderr warning. The guard is intentionally a log, not -// a panic, so internal drivers / legacy fixtures can still drive -// Compile; the log makes the regression observable. +// visible warning through common.Logger. The guard is +// intentionally a log, not a panic, so internal drivers / legacy +// fixtures can still drive Compile; the log makes the regression +// observable. // -// The test redirects log output to a buffer and asserts the -// expected substring. We don't fail on `Compile` itself failing — -// the legacy fixture graph is intentionally minimal and may not -// compile end-to-end without a Begin node; the assertion is -// strictly about the log surface. +// The test swaps common.Logger for a buffer-backed encoder so +// we can assert on the structured log message. We don't fail on +// `Compile` itself failing — the legacy fixture graph is +// intentionally minimal and may not compile end-to-end without a +// Begin node; the assertion is strictly about the log surface. func TestCompile_LogsWhenLegacyNodesPresent(t *testing.T) { var buf bytes.Buffer - prev := log.Writer() - log.SetOutput(&buf) - t.Cleanup(func() { log.SetOutput(prev) }) + prev := common.Logger + common.Logger = zap.New( + zapcore.NewCore( + zapcore.NewConsoleEncoder(zap.NewProductionEncoderConfig()), + zapcore.AddSync(&buf), + zapcore.InfoLevel, + ), + ) + t.Cleanup(func() { common.Logger = prev }) c := &Canvas{ Components: map[string]CanvasComponent{ @@ -76,9 +87,15 @@ func TestCompile_LogsWhenLegacyNodesPresent(t *testing.T) { // every Compile. func TestCompile_NoLogOnCleanCanvas(t *testing.T) { var buf bytes.Buffer - prev := log.Writer() - log.SetOutput(&buf) - t.Cleanup(func() { log.SetOutput(prev) }) + prev := common.Logger + common.Logger = zap.New( + zapcore.NewCore( + zapcore.NewConsoleEncoder(zap.NewProductionEncoderConfig()), + zapcore.AddSync(&buf), + zapcore.InfoLevel, + ), + ) + t.Cleanup(func() { common.Logger = prev }) c := &Canvas{ Components: map[string]CanvasComponent{ diff --git a/internal/agent/canvas/runner.go b/internal/agent/canvas/runner.go index 9fa6b86abc..1902926027 100644 --- a/internal/agent/canvas/runner.go +++ b/internal/agent/canvas/runner.go @@ -52,13 +52,15 @@ import ( "encoding/json" "errors" "fmt" - "log" "runtime/debug" "strings" "sync" "time" "github.com/google/uuid" + "go.uber.org/zap" + + "ragflow/internal/common" ) // RunEvent is the unit the Runner pushes onto its output channel. @@ -280,7 +282,10 @@ func (r *Runner) Run( // surfaces a clear root cause in the server log. defer func() { if rec := recover(); rec != nil { - log.Printf("canvas runner PANIC canvas=%q session=%q: %v\n%s", canvasID, sessionID, rec, debug.Stack()) + common.Error("canvas runner PANIC", fmt.Errorf("%v", rec), + zap.String("canvas", canvasID), + zap.String("session", sessionID), + zap.String("stack", string(debug.Stack()))) } }() @@ -309,7 +314,13 @@ func (r *Runner) Run( // the prompt to the visible waiting node. displayID := FirstInterruptID(ctxs) resumeID := RootInterruptID(ctxs) - log.Printf("canvas runner interrupt canvas=%q session=%q task=%q contexts=%s display=%q resume=%q", canvasID, sessionID, taskID, formatInterruptContexts(ctxs), displayID, resumeID) + common.Info("canvas runner interrupt", + zap.String("canvas", canvasID), + zap.String("session", sessionID), + zap.String("task", taskID), + zap.String("contexts", formatInterruptContexts(ctxs)), + zap.String("display", displayID), + zap.String("resume", resumeID)) r.saveInterruptID(canvasID, sessionID, resumeID) waiting := WaitingForUserEvent{CpnID: displayID} if ctx := FirstUserFillUpInterrupt(ctxs); ctx != nil { @@ -409,7 +420,8 @@ func safeInvoke(ctx context.Context, cancel chan struct{}, run RunFunc, root map // runner emit a terminal `done` event. defer func() { if rec := recover(); rec != nil { - log.Printf("canvas runner PANIC: %v\n%s", rec, debug.Stack()) + common.Error("canvas runner PANIC", fmt.Errorf("%v", rec), + zap.String("stack", string(debug.Stack()))) err = fmt.Errorf("canvas runner panic: %v", rec) } close(done) diff --git a/internal/agent/canvas/scheduler.go b/internal/agent/canvas/scheduler.go index 0149c83d44..6324877084 100644 --- a/internal/agent/canvas/scheduler.go +++ b/internal/agent/canvas/scheduler.go @@ -22,14 +22,15 @@ import ( "context" "encoding/json" "fmt" - "log" "strings" "time" "ragflow/internal/agent/runtime" "ragflow/internal/agent/workflowx" + "ragflow/internal/common" "github.com/cloudwego/eino/compose" + "go.uber.org/zap" ) // ctxKey is the unexported context-key type for per-run metadata @@ -233,7 +234,7 @@ func emitEventFromCtx(ctx context.Context, ev RunEvent) { // node_started RunEvent. Called from the per-node statePre wrapper. // Metadata (message/task/session ids) is read from ctx via RunMeta. func nodeStartedAt(ctx context.Context, state *CanvasState, cpnID, componentName, componentType string) { - log.Printf("DEBUG node_started: cpnID=%s componentName=%s", cpnID, componentName) + common.Debug("node_started", zap.String("cpnID", cpnID), zap.String("componentName", componentName)) if state == nil { return } diff --git a/internal/agent/canvas/stream.go b/internal/agent/canvas/stream.go index 0868596a5d..d6d3be72d0 100644 --- a/internal/agent/canvas/stream.go +++ b/internal/agent/canvas/stream.go @@ -20,7 +20,10 @@ package canvas import ( "encoding/json" - "log" + + "go.uber.org/zap" + + "ragflow/internal/common" ) // StreamEvent is the unit emitted by canvas components to the SSE writer. @@ -69,8 +72,9 @@ func (e *channelEmitter) Emit(ev StreamEvent) error { case e.ch <- ev: return nil default: - log.Printf("canvas stream: dropping event %q for task %q (buffer full)", - ev.Event, ev.TaskID) + common.Warn("canvas stream: dropping event (buffer full)", + zap.String("event", ev.Event), + zap.String("task", ev.TaskID)) return nil } } diff --git a/internal/agent/component/agent.go b/internal/agent/component/agent.go index 1e3cf85161..e2d18bad42 100644 --- a/internal/agent/component/agent.go +++ b/internal/agent/component/agent.go @@ -14,7 +14,6 @@ package component import ( "context" "fmt" - "log" "strings" einotool "github.com/cloudwego/eino/components/tool" @@ -25,7 +24,10 @@ import ( "ragflow/internal/agent/component/prompts" "ragflow/internal/agent/runtime" agenttool "ragflow/internal/agent/tool" + "ragflow/internal/common" "ragflow/internal/entity/models" + + "go.uber.org/zap" ) // agentLLMIDPattern matches `@` and @@ -471,10 +473,17 @@ func (c *AgentComponent) Invoke(ctx context.Context, inputs map[string]any) (map // agentRunner state right before the `msg.Content` deref so a // subsequent panic shows whether the agent returned (nil, nil). if msg == nil { - log.Printf("DEBUG agent.Invoke: msg is NIL after agentRunner — driver=%q modelID=%q userPrompt_len=%d err=%v", p.Driver, p.ModelID, len(p.UserPrompt), err) + common.Debug("agent.Invoke: msg is NIL after agentRunner", + zap.String("driver", p.Driver), + zap.String("modelID", p.ModelID), + zap.Int("userPrompt_len", len(p.UserPrompt)), + zap.Error(err)) return nil, fmt.Errorf("component: Agent.Invoke: agent runner returned nil message (driver=%q modelID=%q): %w", p.Driver, p.ModelID, err) } - log.Printf("DEBUG agent.Invoke: msg OK driver=%q modelID=%q content_len=%d", p.Driver, p.ModelID, len(msg.Content)) + common.Debug("agent.Invoke: msg OK", + zap.String("driver", p.Driver), + zap.String("modelID", p.ModelID), + zap.Int("content_len", len(msg.Content))) content := msg.Content var groundingStatus string if p.Cite { diff --git a/internal/agent/component/llm.go b/internal/agent/component/llm.go index a3fc5ea039..61cccb3783 100644 --- a/internal/agent/component/llm.go +++ b/internal/agent/component/llm.go @@ -14,7 +14,6 @@ import ( "context" "encoding/json" "fmt" - "log" "regexp" "slices" "sort" @@ -26,7 +25,10 @@ import ( "ragflow/internal/agent/component/prompts" "ragflow/internal/agent/runtime" + "ragflow/internal/common" "ragflow/internal/entity/models" + + "go.uber.org/zap" ) // LLMComponent is a one-shot chat call. @@ -330,13 +332,13 @@ func (c *LLMComponent) Invoke(ctx context.Context, inputs map[string]any) (map[s if resolved, rerr := runtime.ResolveTemplate(p.SystemPrompt, state); resolved != p.SystemPrompt || rerr == nil { p.SystemPrompt = resolved if rerr != nil { - log.Printf("component: LLM: resolve system_prompt: %v", rerr) + common.Warn("component: LLM: resolve system_prompt", zap.Error(rerr)) } } if resolved, rerr := runtime.ResolveTemplate(p.UserPrompt, state); resolved != p.UserPrompt || rerr == nil { p.UserPrompt = resolved if rerr != nil { - log.Printf("component: LLM: resolve user_prompt: %v", rerr) + common.Warn("component: LLM: resolve user_prompt", zap.Error(rerr)) } } } @@ -433,7 +435,7 @@ func (c *LLMComponent) Invoke(ctx context.Context, inputs map[string]any) (map[s out["json"] = parsed } else { // Surface a non-fatal warning — caller can still read "content". - log.Printf("component: LLM: json_output=true but content is not valid JSON: %v", err) + common.Warn("component: LLM: json_output=true but content is not valid JSON", zap.Error(err)) } } if p.OutputStructure != nil { @@ -469,7 +471,7 @@ func (c *LLMComponent) Invoke(ctx context.Context, inputs map[string]any) (map[s // downstream consumers reading "content" get the JSON text. out["content"] = resp.Content } else { - log.Printf("component: LLM: output_structure set but no parseable JSON after retry") + common.Warn("component: LLM: output_structure set but no parseable JSON after retry") } } return out, nil diff --git a/internal/agent/component/llm_credentials.go b/internal/agent/component/llm_credentials.go index a2435e7983..03ed1fd0ba 100644 --- a/internal/agent/component/llm_credentials.go +++ b/internal/agent/component/llm_credentials.go @@ -3,12 +3,14 @@ package component import ( "context" "encoding/json" - "log" "strings" "ragflow/internal/agent/runtime" + "ragflow/internal/common" "ragflow/internal/dao" "ragflow/internal/entity" + + "go.uber.org/zap" ) // resolveTenantLLMConfig fills tenant-scoped API credentials for the supplied @@ -21,12 +23,12 @@ func resolveTenantLLMConfig(ctx context.Context, driver, modelID, apiKey, baseUR } state, _, err := runtime.GetStateFromContext[*runtime.CanvasState](ctx) if err != nil || state == nil { - log.Printf("DEBUG llm credentials: no canvas state in ctx") + common.Debug("llm credentials: no canvas state in ctx") return apiKey, baseURL } tid, _ := state.Sys["tenant_id"].(string) if tid == "" { - log.Printf("DEBUG llm credentials: state.Sys has no tenant_id") + common.Debug("llm credentials: state.Sys has no tenant_id") return apiKey, baseURL } @@ -45,14 +47,14 @@ func resolveTenantLLMConfig(ctx context.Context, driver, modelID, apiKey, baseUR // resolveTenantLLMCredentials looks up the old tenant_llm table for the given // tenant / factory / model. Returns true when credentials were found. func resolveTenantLLMCredentials(tid, driver, modelID, baseURL string) (string, string, bool) { - log.Printf("DEBUG llm credentials: tenant_llm lookup tid=%q factory=%q model=%q", tid, driver, modelID) + common.Debug("llm credentials: tenant_llm lookup", zap.String("tid", tid), zap.String("factory", driver), zap.String("model", modelID)) row, err := dao.NewTenantLLMDAO().GetByTenantFactoryAndModelName(tid, driver, modelID) if err != nil { - log.Printf("DEBUG llm credentials: tenant_llm lookup err=%v", err) + common.Debug("llm credentials: tenant_llm lookup", zap.Error(err)) return "", baseURL, false } if row == nil { - log.Printf("DEBUG llm credentials: tenant_llm lookup: no row") + common.Debug("llm credentials: tenant_llm lookup: no row") return "", baseURL, false } @@ -63,7 +65,9 @@ func resolveTenantLLMCredentials(tid, driver, modelID, baseURL string) (string, if baseURL == "" && row.APIBase != nil { baseURL = *row.APIBase } - log.Printf("DEBUG llm credentials: tenant_llm OK api_key=%q base_url=%q", apiKey, baseURL) + common.Debug("llm credentials: tenant_llm OK", + zap.Bool("api_key_present", apiKey != ""), + zap.Bool("base_url_present", baseURL != "")) return apiKey, baseURL, apiKey != "" } @@ -73,16 +77,19 @@ func resolveTenantLLMCredentials(tid, driver, modelID, baseURL string) (string, func resolveTenantModelInstanceCredentials(tid, compositeLLMID, baseURL string) (string, string, bool) { modelName, instanceName, providerName := parseLLMIDParts(compositeLLMID) if instanceName == "" { - log.Printf("DEBUG llm credentials: new-table fallback skipped: no instance name in %q", compositeLLMID) + common.Debug("llm credentials: new-table fallback skipped: no instance name", zap.String("composite_llm_id", compositeLLMID)) return "", baseURL, false } - log.Printf("DEBUG llm credentials: new-table fallback tid=%q provider=%q model=%q instance=%q", - tid, providerName, modelName, instanceName) + common.Debug("llm credentials: new-table fallback", + zap.String("tid", tid), + zap.String("provider", providerName), + zap.String("model", modelName), + zap.String("instance", instanceName)) provider, err := dao.NewTenantModelProviderDAO().GetByTenantIDAndProviderName(tid, providerName) if err != nil || provider == nil { - log.Printf("DEBUG llm credentials: new-table fallback: provider %q not found (err=%v)", providerName, err) + common.Debug("llm credentials: new-table fallback: provider not found", zap.String("provider", providerName), zap.Error(err)) return "", baseURL, false } @@ -90,16 +97,19 @@ func resolveTenantModelInstanceCredentials(tid, compositeLLMID, baseURL string) if err != nil || instance == nil { if instanceName == "default" { if fallback := findSoleActiveProviderInstance(provider.ID); fallback != nil { - log.Printf("DEBUG llm credentials: new-table fallback: remapped default instance to sole active instance %q for provider %q", - fallback.InstanceName, providerName) + common.Debug("llm credentials: new-table fallback: remapped default instance to sole active instance", + zap.String("instance", fallback.InstanceName), + zap.String("provider", providerName)) instance = fallback err = nil } } } if err != nil || instance == nil { - log.Printf("DEBUG llm credentials: new-table fallback: instance %q not found for provider %q (err=%v)", - instanceName, providerName, err) + common.Debug("llm credentials: new-table fallback: instance not found", + zap.String("instance", instanceName), + zap.String("provider", providerName), + zap.Error(err)) return "", baseURL, false } @@ -113,15 +123,18 @@ func resolveTenantModelInstanceCredentials(tid, compositeLLMID, baseURL string) } } - log.Printf("DEBUG llm credentials: new-table OK provider=%q instance=%q api_key=%q base_url=%q", - providerName, instance.InstanceName, apiKey, baseURL) + common.Debug("llm credentials: new-table OK", + zap.String("provider", providerName), + zap.String("instance", instance.InstanceName), + zap.Bool("api_key_present", apiKey != ""), + zap.Bool("base_url_present", baseURL != "")) return apiKey, baseURL, apiKey != "" } func findSoleActiveProviderInstance(providerID string) *entity.TenantModelInstance { instances, err := dao.NewTenantModelInstanceDAO().GetAllInstancesByProviderID(providerID) if err != nil { - log.Printf("DEBUG llm credentials: list provider instances err=%v", err) + common.Debug("llm credentials: list provider instances", zap.Error(err)) return nil } active := make([]*entity.TenantModelInstance, 0, len(instances)) diff --git a/internal/agent/component/message.go b/internal/agent/component/message.go index 0939f09516..22aad869bc 100644 --- a/internal/agent/component/message.go +++ b/internal/agent/component/message.go @@ -33,12 +33,12 @@ package component import ( "context" "fmt" - "log" "maps" "regexp" "ragflow/internal/agent/audio" "ragflow/internal/agent/runtime" + "ragflow/internal/common" ) const componentNameMessage = "Message" @@ -283,7 +283,7 @@ func (m *MessageComponent) Invoke(ctx context.Context, inputs map[string]any) (m }) if saveErr != nil { out["memory_error"] = saveErr.Error() - log.Printf("Message: memory_save failed: %v", saveErr) + common.Error("Message: memory_save failed", saveErr) } } } diff --git a/internal/agent/component/universe_a_wrappers.go b/internal/agent/component/universe_a_wrappers.go index fa33f9a7ec..4984f8a93a 100644 --- a/internal/agent/component/universe_a_wrappers.go +++ b/internal/agent/component/universe_a_wrappers.go @@ -32,13 +32,15 @@ import ( "context" "encoding/json" "fmt" - "log" "strconv" "strings" einotool "github.com/cloudwego/eino/components/tool" agenttool "ragflow/internal/agent/tool" + "ragflow/internal/common" + + "go.uber.org/zap" ) // tavilySearchComponent delegates to internal/agent/tool/TavilyTool. @@ -435,7 +437,11 @@ func (c *codeExecComponent) Invoke(ctx context.Context, inputs map[string]any) ( if rawArgs, ok := merged["arguments"].(map[string]any); ok { merged["arguments"] = resolveCodeExecArguments(rawArgs, merged) } - log.Printf("DEBUG CodeExec wrapper invoke: params=%#v inputs=%#v merged=%#v", c.params, inputs, merged) + common.Debug("CodeExec wrapper invoke", + zap.Int("params_keys", len(c.params)), + zap.Int("inputs_keys", len(inputs)), + zap.Int("merged_keys", len(merged)), + zap.Bool("has_arguments", merged["arguments"] != nil)) argsJSON, _ := json.Marshal(merged) out, err := c.inner.InvokableRun(ctx, string(argsJSON)) decoded := parseToolEnvelope(out) @@ -480,8 +486,11 @@ func applyCodeExecBusinessOutputs(decoded map[string]any, outputs map[string]any return } rawResult := resolveCodeExecBusinessResult(decoded) - log.Printf("DEBUG CodeExec wrapper: decoded=%#v resolved_raw_result=%#v content=%#v outputs=%#v", - decoded, rawResult, decoded["content"], outputs) + common.Debug("CodeExec wrapper", + zap.Int("decoded_keys", len(decoded)), + zap.Bool("has_raw_result", rawResult != nil), + zap.Bool("has_content", decoded["content"] != nil), + zap.Int("outputs_keys", len(outputs))) if existingErr, _ := decoded["_ERROR"].(string); strings.TrimSpace(existingErr) != "" { for name := range outputs { if isCodeExecSystemOutput(name) { diff --git a/internal/agent/sandbox/self_managed.go b/internal/agent/sandbox/self_managed.go index 5a07310831..d7f7f18a5f 100644 --- a/internal/agent/sandbox/self_managed.go +++ b/internal/agent/sandbox/self_managed.go @@ -61,7 +61,6 @@ import ( "encoding/json" "fmt" "io" - "log" "net/http" "os" "strings" @@ -69,6 +68,9 @@ import ( "time" "github.com/google/uuid" + "go.uber.org/zap" + + "ragflow/internal/common" ) // selfManagedDefaultEndpoint is the canonical executor_manager @@ -323,8 +325,12 @@ func (p *SelfManagedProvider) ExecuteCode( "runtime_error_type": raw.RuntimeErr, "structured_result": structured, } - log.Printf("DEBUG CodeExec self_managed: http_result=%#v structured_result=%#v stdout=%q stderr=%q exit_code=%d", - raw.Result, structured, stdout, raw.Stderr, raw.ExitCode) + common.Debug("CodeExec self_managed", + zap.Any("http_result", raw.Result), + zap.Any("structured_result", structured), + zap.String("stdout", stdout), + zap.String("stderr", raw.Stderr), + zap.Int("exit_code", raw.ExitCode)) return &ExecutionResult{ Stdout: stdout, diff --git a/internal/agent/tool/code_exec.go b/internal/agent/tool/code_exec.go index 6eb38a034e..a714af71d1 100644 --- a/internal/agent/tool/code_exec.go +++ b/internal/agent/tool/code_exec.go @@ -21,12 +21,14 @@ import ( "encoding/json" "errors" "fmt" - "log" "os" "strings" "github.com/cloudwego/eino/components/tool" "github.com/cloudwego/eino/schema" + "go.uber.org/zap" + + "ragflow/internal/common" ) // ErrCodeExecSandboxMissing is returned when no sandbox client is @@ -150,7 +152,11 @@ func (c *CodeExecTool) InvokableRun(ctx context.Context, argumentsInJSON string, Arguments: args.Args, Timeout: args.Timeout, } - log.Printf("DEBUG CodeExec tool invoke: lang=%q timeout=%d arguments=%#v script=%q", req.Lang, req.Timeout, req.Arguments, req.Script) + common.Debug("CodeExec tool invoke", + zap.String("lang", req.Lang), + zap.Int("timeout", req.Timeout), + zap.Int("arguments_keys", len(req.Arguments)), + zap.Int("script_len", len(req.Script))) resp, err := client.ExecuteCode(ctx, req) if err != nil { return codeExecStubResult(err.Error()), err @@ -213,8 +219,15 @@ func codeExecResultJSON(r *SandboxResponse) (string, error) { out.ActualType = InferCodeExecActualType(out.RawResult) out.Content = RenderCodeExecCanonicalContent(out.RawResult) } - log.Printf("DEBUG CodeExec tool: structured_result=%#v resolved_value=%#v raw_result=%#v content=%q actual_type=%q stderr=%q stdout=%q", - r.StructuredResult, resolvedValue, out.RawResult, out.Content, out.ActualType, r.Stderr, r.Stdout) + common.Debug("CodeExec tool", + zap.Any("structured_result", r.StructuredResult), + zap.Any("resolved_value", resolvedValue), + zap.Any("raw_result", out.RawResult), + zap.String("content", out.Content), + zap.String("actual_type", out.ActualType), + zap.Bool("stderr_present", r.Stderr != ""), + zap.Int("stderr_len", len(r.Stderr)), + zap.Int("stdout_len", len(r.Stdout))) b, err := json.Marshal(out) if err != nil { return "", fmt.Errorf("code_exec: marshal result: %w", err) diff --git a/internal/common/logger.go b/internal/common/logger.go index 24aa274a6f..2224048e75 100644 --- a/internal/common/logger.go +++ b/internal/common/logger.go @@ -17,23 +17,48 @@ package common import ( + "errors" "fmt" "os" "path/filepath" "runtime" "strings" - "sync" + "time" + "github.com/gin-gonic/gin" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "gopkg.in/natefinch/lumberjack.v2" ) var ( Logger *zap.Logger Sugar *zap.SugaredLogger - levelMu sync.RWMutex atomicLevel zap.AtomicLevel - pkgLevels map[string]string +) + +// FileOutput describes the rotated log file destination. +// +// Path is required to enable file output; empty disables the file destination +// (stdout only). When Path is set, the file is written under ./logs/ +// and rotated by lumberjack according to MaxSize / MaxBackups / MaxAge / Compress. +// +// Numeric zero values are replaced with defaults (100 MB / 10 / 30 days) inside +// Init. Compress is left as the caller-provided value; the project default is +// applied by callers (see resolveCompress) so that "not set" can be distinguished +// from "explicitly false" via the *bool LogConfig.Compress field. +type FileOutput struct { + Path string + MaxSize int + MaxBackups int + MaxAge int + Compress bool +} + +const ( + defaultMaxSizeMB = 100 + defaultMaxBackups = 10 + defaultMaxAgeDays = 30 ) func parseZapLevel(level string) (zapcore.Level, error) { @@ -62,111 +87,93 @@ func logLevelName(level zapcore.Level) string { return strings.ToUpper(level.String()) } -func initPackageLogLevels(rootLevel zapcore.Level) { - levels := make(map[string]string) - for _, item := range strings.Split(os.Getenv("LOG_LEVELS"), ",") { - terms := strings.SplitN(item, "=", 2) - if len(terms) != 2 { - continue - } - pkgName := strings.TrimSpace(terms[0]) - if pkgName == "" { - continue - } - level, err := parseZapLevel(terms[1]) - if err != nil { - level = zapcore.InfoLevel - } - levels[pkgName] = logLevelName(level) - } - // I set it to align with python for now, we shall change it later before ragflow 1.0 - if _, ok := levels["peewee"]; !ok { - levels["peewee"] = logLevelName(zapcore.WarnLevel) - } - if _, ok := levels["pdfminer"]; !ok { - levels["pdfminer"] = logLevelName(zapcore.WarnLevel) - } - if _, ok := levels["root"]; !ok { - levels["root"] = logLevelName(rootLevel) - } - pkgLevels = levels -} - -// Init initializes the global logger -// Note: This requires zap to be installed: go get go.uber.org/zap -func Init(level string, logFile string) error { +// Init initializes the global logger. stdout is always written. If file.Path +// is non-empty, a rotated file is also written via lumberjack. +// +// Callers should pass a non-empty Path so that file logging is preserved +// (each binary's hardcoded default goes through this parameter). The empty +// path case is reserved for CLI mode where stdout is the only output. +// +// Numeric fields (MaxSize, MaxBackups, MaxAge) are defaulted to 100/10/30 +// when zero. Compress is taken as supplied. +func Init(level string, file FileOutput) error { zapLevel, err := parseZapLevel(level) if err != nil { zapLevel = zapcore.InfoLevel } - // Create atomic level for dynamic updates atomicLevel = zap.NewAtomicLevelAt(zapLevel) - // Custom encoder config to control output format encoderConfig := zapcore.EncoderConfig{ - TimeKey: "timestamp", - LevelKey: "level", - NameKey: "logger", - CallerKey: "", // Disable caller/line number - FunctionKey: "", - MessageKey: "msg", - StacktraceKey: "stacktrace", - LineEnding: zapcore.DefaultLineEnding, - EncodeLevel: zapcore.LowercaseLevelEncoder, - EncodeTime: zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05"), // Human-readable time format + TimeKey: "timestamp", + LevelKey: "level", + NameKey: "logger", + CallerKey: "", + FunctionKey: "", + MessageKey: "msg", + StacktraceKey: "stacktrace", + LineEnding: zapcore.DefaultLineEnding, + EncodeLevel: zapcore.LowercaseLevelEncoder, + // RFC 3339 with fixed-width millisecond precision and explicit + // timezone offset (UTC rendered as "Z", other zones as "+HH:MM" + // / "-HH:MM"). Easier to ingest than the default "2006-01-02 + // 15:04:05" layout — which had no ms and no zone — and avoids + // the variable-width output of RFC3339Nano. + EncodeTime: zapcore.TimeEncoderOfLayout("2006-01-02T15:04:05.000Z07:00"), EncodeDuration: zapcore.SecondsDurationEncoder, - EncodeCaller: zapcore.ShortCallerEncoder, // Not used since CallerKey is empty + EncodeCaller: zapcore.ShortCallerEncoder, } - // Determine output paths - outputPaths := []string{"stdout"} - errorPaths := []string{"stderr"} - if logFile != "" { - logFile = filepath.Join("logs", logFile) - outputPaths = append(outputPaths, logFile) - errorPaths = append(errorPaths, logFile) + maxSize := file.MaxSize + if maxSize <= 0 { + maxSize = defaultMaxSizeMB + } + maxBackups := file.MaxBackups + if maxBackups <= 0 { + maxBackups = defaultMaxBackups + } + maxAge := file.MaxAge + if maxAge <= 0 { + maxAge = defaultMaxAgeDays } - // Configure zap - config := zap.Config{ - Level: atomicLevel, - Development: false, - Encoding: "console", - EncoderConfig: encoderConfig, - OutputPaths: outputPaths, - ErrorOutputPaths: errorPaths, + syncers := []zapcore.WriteSyncer{zapcore.AddSync(os.Stdout)} + if file.Path != "" { + ljLogger := &lumberjack.Logger{ + Filename: filepath.Join("logs", file.Path), + MaxSize: maxSize, + MaxBackups: maxBackups, + MaxAge: maxAge, + Compress: file.Compress, + LocalTime: true, + } + syncers = append(syncers, zapcore.AddSync(ljLogger)) } - // Build logger - logger, err := config.Build(zap.AddCallerSkip(1)) - if err != nil { - return err - } + core := zapcore.NewCore( + zapcore.NewConsoleEncoder(encoderConfig), + zap.CombineWriteSyncers(syncers...), + atomicLevel, + ) - Logger = logger - Sugar = logger.Sugar() - - levelMu.Lock() - initPackageLogLevels(zapLevel) - levelMu.Unlock() + Logger = zap.New(core, zap.AddCallerSkip(1)) + Sugar = Logger.Sugar() return nil } -// Sync flushes any buffered log entries +// Sync flushes any buffered log entries. func Sync() { if Logger != nil { _ = Logger.Sync() } } -// Fatal logs a fatal message using zap with caller info +// Fatal logs a fatal message using zap with caller info, then calls os.Exit(1). func Fatal(msg string, fields ...zap.Field) { if Logger == nil { panic("logger not initialized") } - // Get caller info (skip this function to get the actual caller) _, file, line, ok := runtime.Caller(1) if ok { fields = append(fields, zap.String("caller", fmt.Sprintf("%s:%d", file, line))) @@ -174,7 +181,7 @@ func Fatal(msg string, fields ...zap.Field) { Logger.Fatal(msg, fields...) } -// Info logs an info message using zap or standard logger +// Info logs an info message. func Info(msg string, fields ...zap.Field) { if Logger == nil { return @@ -182,15 +189,19 @@ func Info(msg string, fields ...zap.Field) { Logger.Info(msg, fields...) } -// Error logs an error message using zap or standard logger -func Error(msg string, err error) { +// Error logs an error message. err may be nil; if non-nil it is appended as +// a zap.Error field. Additional fields follow. +func Error(msg string, err error, fields ...zap.Field) { if Logger == nil { return } - Logger.Error(msg, zap.Error(err)) + if err != nil { + fields = append(fields, zap.Error(err)) + } + Logger.Error(msg, fields...) } -// Debug logs a debug message using zap or standard logger +// Debug logs a debug message. func Debug(msg string, fields ...zap.Field) { if Logger == nil { return @@ -198,7 +209,7 @@ func Debug(msg string, fields ...zap.Field) { Logger.Debug(msg, fields...) } -// Warn logs a warning message using zap or standard logger +// Warn logs a warning message. func Warn(msg string, fields ...zap.Field) { if Logger == nil { return @@ -206,63 +217,110 @@ func Warn(msg string, fields ...zap.Field) { Logger.Warn(msg, fields...) } -// IsDebugEnabled returns true if debug logging is enabled +// IsDebugEnabled returns true if debug logging is enabled. func IsDebugEnabled() bool { return atomicLevel.Enabled(zapcore.DebugLevel) } -// GetLevel returns the current log level +// GetLevel returns the current log level. func GetLevel() string { - levelMu.RLock() - defer levelMu.RUnlock() return atomicLevel.String() } -// GetLogLevels returns Python-compatible package log levels. -func GetLogLevels() map[string]string { - levelMu.RLock() - defer levelMu.RUnlock() - - levels := make(map[string]string, len(pkgLevels)) - for pkgName, level := range pkgLevels { - levels[pkgName] = level - } - return levels -} - -// SetLevel sets the log level at runtime +// SetLevel sets the log level at runtime. func SetLevel(level string) error { - levelMu.Lock() - defer levelMu.Unlock() - zapLevel, err := parseZapLevel(level) if err != nil { return err } atomicLevel.SetLevel(zapLevel) - if pkgLevels == nil { - pkgLevels = make(map[string]string) - } - pkgLevels["root"] = logLevelName(zapLevel) return nil } -// SetPackageLogLevel sets a Python-compatible package log level at runtime. -func SetPackageLogLevel(pkgName, level string) error { - zapLevel, err := parseZapLevel(level) - if err != nil { - return err +// ResolveCompress applies the project default (true) when the config-level +// Compress is nil. When non-nil, the operator's choice is used as-is. +// +// The project default is compression on; operators can opt out by setting +// log.compress: false in service_conf.yaml. Because Go's bool zero value is +// false and would otherwise be indistinguishable from "not set", the YAML +// struct uses *bool and this helper resolves the defaulting at the cmd/ +// boundary. The *bool does not live in this file because FileOutput itself +// takes a plain bool (the caller has already resolved the default by then). +func ResolveCompress(c *bool) bool { + if c == nil { + return true + } + return *c +} + +// GinLogger returns a gin middleware that emits one log line per request +// through Logger. Level is chosen by status: +// +// 5xx → Error (with err from c.Errors, or sentinel if none) +// 4xx → Warn +// else → Info +// +// c.Errors content is always included as a zap.String("error", ...) field +// when present, regardless of level. This is the project-standard HTTP +// access log; it replaces gin.Logger() so every request line lands in the +// same log file as the rest of the application. +// +// The raw query string is intentionally NOT logged — the path field +// carries only the URL path. Query parameters frequently carry secrets +// (OAuth codes, SAML responses, signed state, API keys in callback +// URLs, etc.) and there is no way to redact them generically. The +// presence and length of a query string are recorded instead so +// operators can still see that one was sent. +func GinLogger() gin.HandlerFunc { + return func(c *gin.Context) { + start := time.Now() + path := c.Request.URL.Path + raw := c.Request.URL.RawQuery + c.Next() + latency := time.Since(start) + status := c.Writer.Status() + + fields := []zap.Field{ + zap.Int("status", status), + zap.String("method", c.Request.Method), + zap.String("path", path), + zap.Duration("latency", latency), + zap.String("client_ip", c.ClientIP()), + zap.Int("size", c.Writer.Size()), + zap.Bool("has_query", raw != ""), + zap.Int("query_len", len(raw)), + } + + var ginErr error + if len(c.Errors) > 0 { + last := c.Errors.Last() + // Only emit the string error field for non-5xx paths. The 5xx + // branch below routes ginErr through common.Error(), which + // already adds a structured zap.Error field; logging both + // creates two "error" fields in the same record and confuses + // log aggregation. 4xx / 2xx-3xx paths use Warn/Info which do + // not take an error arg, so the string form is their only + // way to surface c.Errors content. + if status < 500 { + fields = append(fields, zap.String("error", last.Error())) + } + ginErr = last.Err + } + + msg := "HTTP request" + switch { + case status >= 500: + if ginErr == nil { + // Likely a panic recovered by gin.Recovery() with no c.Error attached. + // Use a sentinel so the err field is non-empty; operators can + // grep for this string in logs. + ginErr = errors.New("5xx response with no handler error attached") + } + Error(msg, ginErr, fields...) + case status >= 400: + Warn(msg, fields...) + default: + Info(msg, fields...) + } } - - levelMu.Lock() - defer levelMu.Unlock() - - if pkgLevels == nil { - pkgLevels = make(map[string]string) - } - pkgLevels[pkgName] = logLevelName(zapLevel) - if pkgName == "root" { - atomicLevel.SetLevel(zapLevel) - } - return nil } diff --git a/internal/dao/database.go b/internal/dao/database.go index 366b6b424d..ae15481789 100644 --- a/internal/dao/database.go +++ b/internal/dao/database.go @@ -18,7 +18,6 @@ package dao import ( "fmt" - "log" "os" "path/filepath" "ragflow/internal/common" @@ -176,7 +175,7 @@ func InitDB() error { err = models.InitProviderManager("conf/models") if err != nil { - log.Fatal("Failed to load model providers:", err) + common.Fatal("Failed to load model providers", zap.Error(err)) } modelProviderManager = models.GetProviderManager() @@ -207,10 +206,10 @@ func GetModelProviderManager() *models.ProviderManager { } modelConfigDir, err := findModelConfigDir() if err != nil { - log.Fatal("Failed to locate model providers:", err) + common.Fatal("Failed to locate model providers", zap.Error(err)) } if err := models.InitProviderManager(modelConfigDir); err != nil { - log.Fatal("Failed to load model providers:", err) + common.Fatal("Failed to load model providers", zap.Error(err)) } modelProviderManager = models.GetProviderManager() return modelProviderManager diff --git a/internal/handler/system.go b/internal/handler/system.go index ba49d3814c..7c26074d40 100644 --- a/internal/handler/system.go +++ b/internal/handler/system.go @@ -162,22 +162,38 @@ func (h *SystemHandler) GetVersion(c *gin.Context) { }) } -// GetLogLevel returns the current log level +// GetLogLevel returns the current log level. The response uses the +// {"level": } shape — the same shape the admin handler's +// /admin/log_level endpoint returns — so the two log endpoints stay +// in lockstep. Per-package level entries that the old pkgLevels +// table carried (e.g. "peewee", "pdfminer") were inert for the Go +// side and are no longer returned. func (h *SystemHandler) GetLogLevel(c *gin.Context) { c.JSON(http.StatusOK, gin.H{ "code": 0, "message": "success", - "data": common.GetLogLevels(), + "data": gin.H{"level": common.GetLevel()}, }) } -// SetLogLevelRequest set log level request +// SetLogLevelRequest set log level request. PkgName is accepted for +// backward compatibility with clients that previously targeted +// per-package levels; it is silently ignored. Only the global level +// can be set on the Go side. type SetLogLevelRequest struct { - PkgName string `json:"pkg_name" binding:"required"` + PkgName string `json:"pkg_name"` Level string `json:"level" binding:"required"` } -// SetLogLevel sets the log level at runtime +// SetLogLevel sets the log level at runtime. +// +// The "pkg_name and level are required" error message is preserved +// verbatim from the pre-Go-port handler so existing clients that +// inspect `message` on the missing-field path keep working. On the +// Go side `pkg_name` is no longer required (per-package filtering +// is gone), but the message wording is unchanged for backward +// compatibility — only `level` is enforced by binding; `pkg_name` +// is accepted but ignored. func (h *SystemHandler) SetLogLevel(c *gin.Context) { var req SetLogLevelRequest if err := c.ShouldBindJSON(&req); err != nil { @@ -188,7 +204,7 @@ func (h *SystemHandler) SetLogLevel(c *gin.Context) { return } - if err := common.SetPackageLogLevel(req.PkgName, req.Level); err != nil { + if err := common.SetLevel(req.Level); err != nil { c.JSON(http.StatusOK, gin.H{ "code": common.CodeDataError, "message": "Invalid log level: " + req.Level, @@ -196,14 +212,13 @@ func (h *SystemHandler) SetLogLevel(c *gin.Context) { return } - config := server.GetConfig() - if req.PkgName == "root" && config != nil { + if config := server.GetConfig(); config != nil { config.Log.Level = common.GetLevel() } c.JSON(http.StatusOK, gin.H{ "code": 0, "message": "success", - "data": gin.H{"pkg_name": req.PkgName, "level": req.Level}, + "data": gin.H{"level": req.Level}, }) } diff --git a/internal/harness/core/interrupt.go b/internal/harness/core/interrupt.go index 4e1d330c78..99cdb41b5a 100644 --- a/internal/harness/core/interrupt.go +++ b/internal/harness/core/interrupt.go @@ -10,10 +10,12 @@ import ( "encoding/gob" "errors" "fmt" - "log" "os" + "ragflow/internal/common" "ragflow/internal/harness/core/schema" + + "go.uber.org/zap" ) // ---- Resume types ---- @@ -48,8 +50,6 @@ const ( AddressSegmentTool AddressSegmentType = "tool" ) - - type InterruptCtx struct { ID string Address Address @@ -139,7 +139,9 @@ func getAddressSegments(ctx context.Context) []AddressSegment { // FromInterruptContexts builds an InterruptSignal tree from a flat slice of // InterruptCtx. Returns nil when ctxs is empty. func FromInterruptContexts(ctxs []*InterruptCtx) *InterruptSignal { - if len(ctxs) == 0 { return nil } + if len(ctxs) == 0 { + return nil + } root := &InterruptSignal{} buildFromCtxs(ctxs, root) return root @@ -210,8 +212,8 @@ func extractCheckpointTenant(ctx context.Context) string { // ---- Checkpoint integrity (HMAC) ---- const ( - hmacLen = 32 - envHMACKey = "CHECKPOINT_HMAC_KEY" + hmacLen = 32 + envHMACKey = "CHECKPOINT_HMAC_KEY" ) // checkpointHMACKey reads the HMAC key from the CHECKPOINT_HMAC_KEY env var @@ -236,7 +238,7 @@ func loadCheckpointHMACKey() []byte { if _, err := rand.Read(k); err != nil { panic("failed to generate checkpoint HMAC key: " + err.Error()) } - log.Printf("WARNING: %s not set — using random per-process key. Checkpoint resume across restarts will fail.", envHMACKey) + common.Warn("checkpoint HMAC env not set — using random per-process key; checkpoint resume across restarts will fail", zap.String("env", envHMACKey)) return k } @@ -248,8 +250,12 @@ func computeCheckpointHMAC(payload []byte) []byte { func loadCheckpoint(store CheckPointStore, ctx context.Context, cid string) (context.Context, *runContext, *ResumeInfo, error) { data, exist, err := store.Get(ctx, cid) - if err != nil { return nil, nil, nil, fmt.Errorf("checkpoint get: %w", err) } - if !exist { return nil, nil, nil, fmt.Errorf("checkpoint %s not found", cid) } + if err != nil { + return nil, nil, nil, fmt.Errorf("checkpoint get: %w", err) + } + if !exist { + return nil, nil, nil, fmt.Errorf("checkpoint %s not found", cid) + } // Split: first 32 bytes = HMAC, rest = payload if len(data) < hmacLen { @@ -292,12 +298,14 @@ func loadCheckpoint(store CheckPointStore, ctx context.Context, cid string) (con return ctx, p.RunCtx, &ResumeInfo{ EnableStreaming: p.EnableStreaming, - InterruptInfo: p.Info, + InterruptInfo: p.Info, }, nil } func saveCheckpoint(store CheckPointStore, ctx context.Context, key string, enableStreaming bool, info *InterruptInfo, is *InterruptSignal) error { - if store == nil { return nil } + if store == nil { + return nil + } rc := getRunCtx(ctx) id2addr, id2state := signalToMaps(is) tenantID := extractCheckpointTenant(ctx) @@ -329,15 +337,23 @@ func saveCheckpoint(store CheckPointStore, ctx context.Context, key string, enab // by CompositeInterrupt/TypedCompositeInterrupt constructors. func signalToMaps(is *InterruptSignal) (map[string]Address, map[string]InterruptState) { a, s := make(map[string]Address), make(map[string]InterruptState) - if is == nil { return a, s } + if is == nil { + return a, s + } if is.ID != "" { a[is.ID] = is.Address - if is.State != nil { s[is.ID] = InterruptState{State: is.State} } + if is.State != nil { + s[is.ID] = InterruptState{State: is.State} + } } for _, c := range is.Children { ca, cs := signalToMaps(c) - for k, v := range ca { a[k] = v } - for k, v := range cs { s[k] = v } + for k, v := range ca { + a[k] = v + } + for k, v := range cs { + s[k] = v + } } return a, s } diff --git a/internal/harness/graph/checkpoint/nats.go b/internal/harness/graph/checkpoint/nats.go index 1d878694a5..d5b943b844 100644 --- a/internal/harness/graph/checkpoint/nats.go +++ b/internal/harness/graph/checkpoint/nats.go @@ -4,12 +4,14 @@ import ( "context" "encoding/json" "fmt" - "log" "sync" "time" + "ragflow/internal/common" "ragflow/internal/harness/graph/constants" + "github.com/nats-io/nats.go/jetstream" + "go.uber.org/zap" ) // NATSSaver implements BaseCheckpointer using NATS KV Store (JetStream-backed). @@ -21,18 +23,18 @@ import ( // // Garbage Collection (two layers): // -// Layer 1 — Per-key version management (zero-touch): -// NATS KV's History=N automatically discards old versions per key. -// Each graph instance keeps only the latest N checkpoints. -// This handles the normal case: an active graph continuously writes, -// older versions are naturally evicted by NATS. +// Layer 1 — Per-key version management (zero-touch): +// NATS KV's History=N automatically discards old versions per key. +// Each graph instance keeps only the latest N checkpoints. +// This handles the normal case: an active graph continuously writes, +// older versions are naturally evicted by NATS. // -// Layer 2 — Completed graph instance cleanup (background): -// When a graph finishes execution (or crashes), its key becomes dormant. -// The background GC periodically scans all keys and purges those -// whose latest checkpoint is older than MaxGraphIdle. -// An idle key = a completed/abandoned graph instance. -// This prevents orphaned checkpoint data from accumulating. +// Layer 2 — Completed graph instance cleanup (background): +// When a graph finishes execution (or crashes), its key becomes dormant. +// The background GC periodically scans all keys and purges those +// whose latest checkpoint is older than MaxGraphIdle. +// An idle key = a completed/abandoned graph instance. +// This prevents orphaned checkpoint data from accumulating. // // Multi-tenant: // - All graph instances across all tenants share one KV bucket. @@ -324,9 +326,7 @@ func (s *NATSSaver) collectGarbage() { } if purged > 0 { - // TODO: Replace with application-level structured logger when available. - // Using log.Printf as a lightweight fallback for GC events. - log.Printf("[NATSSaver] GC: purged %d completed graph instances (keys)", purged) + common.Info("NATSSaver GC purged completed graph instances", zap.Int("purged", purged)) } } diff --git a/internal/harness/graph/pregel/engine.go b/internal/harness/graph/pregel/engine.go index 53f9f6cd8b..b3a3674589 100644 --- a/internal/harness/graph/pregel/engine.go +++ b/internal/harness/graph/pregel/engine.go @@ -4,15 +4,17 @@ package pregel import ( "context" "fmt" - "log" "reflect" "sort" "strings" "sync" "github.com/google/uuid" - "ragflow/internal/harness/graph/checkpoint" + "go.uber.org/zap" + + "ragflow/internal/common" "ragflow/internal/harness/graph/channels" + "ragflow/internal/harness/graph/checkpoint" "ragflow/internal/harness/graph/constants" "ragflow/internal/harness/graph/errors" "ragflow/internal/harness/graph/graph" @@ -49,10 +51,10 @@ type Engine struct { // deferredCheckpoint stores checkpoint data for deferred saving (DurabilityExit mode) type deferredCheckpoint struct { - ThreadID string + ThreadID string CheckpointID string - Step int - Checkpoint map[string]interface{} + Step int + Checkpoint map[string]interface{} } // NewEngine creates a new Pregel engine bound to a StateGraph. @@ -179,7 +181,7 @@ type ExecuteResult struct { func (e *Engine) Run(ctx context.Context, input interface{}, mode types.StreamMode) (<-chan interface{}, <-chan error) { outputCh := make(chan interface{}, 100) errCh := make(chan error, 1) - + go func() { defer close(errCh) @@ -210,7 +212,7 @@ func (e *Engine) Run(ctx context.Context, input interface{}, mode types.StreamMo fwWg.Wait() close(outputCh) }() - + // Create async pipeline for concurrent task execution retryPolicy := e.retryPolicy if retryPolicy == nil { @@ -220,7 +222,7 @@ func (e *Engine) Run(ctx context.Context, input interface{}, mode types.StreamMo asyncPipeline := NewAsyncPipeline(e.maxConcurrency, retryPolicy) pipelineCtx := asyncPipeline.Start(ctx) defer asyncPipeline.Stop() - + // Reset per-execution engine state. // Without this, reusing the same Engine across multiple RunSync calls // causes checkpoint maps and channel versions to accumulate indefinitely, @@ -236,13 +238,13 @@ func (e *Engine) Run(ctx context.Context, input interface{}, mode types.StreamMo for name, ch := range graphChannels { channelRegistry.Register(name, ch.Copy()) } - + // Apply input to channels if err := e.applyInput(channelRegistry, input); err != nil { errCh <- fmt.Errorf("failed to apply input: %w", err) return } - + // Get thread ID for checkpointing threadID := e.getThreadID() @@ -276,13 +278,13 @@ func (e *Engine) Run(ctx context.Context, input interface{}, mode types.StreamMo defer backgroundExec.Stop() // Replace engine-level backgroundExec reference for use by async pipeline e.backgroundExec = backgroundExec - + // Execute Pregel loop step := 0 completedTasks := make(map[string]bool) lastCompletedNode := "" lastState := input - + for { // Check context cancellation at each superstep. select { @@ -297,27 +299,27 @@ func (e *Engine) Run(ctx context.Context, input interface{}, mode types.StreamMo errCh <- &errors.GraphRecursionError{Limit: e.recursionLimit} return } - + // Emit checkpoint event via stream manager streamManager.EmitCheckpoint(step, channelRegistry.CreateCheckpoint()) - + // Determine next tasks tasks, triggers, err := e.prepareNextTasks(channelRegistry, completedTasks, lastCompletedNode, lastState) if err != nil { errCh <- fmt.Errorf("failed to prepare next tasks: %w", err) return } - + // Emit task start events for _, task := range tasks { streamManager.EmitTaskStart(step, task.Name, task.ID) } - + // If no tasks, we're done if len(tasks) == 0 { break } - + // Check for interrupts interruptedTasks := e.shouldInterrupt(channelRegistry, tasks, triggers) if len(interruptedTasks) > 0 { @@ -331,25 +333,25 @@ func (e *Engine) Run(ctx context.Context, input interface{}, mode types.StreamMo return } } - + // Emit interrupt event interruptNames := make([]string, len(interruptedTasks)) for i, task := range interruptedTasks { interruptNames[i] = task.Name } streamManager.EmitInterrupt(step, interruptNames) - + errCh <- &errors.GraphInterrupt{} return } - + // Execute tasks using async pipeline results, err := e.executeTasksAsync(pipelineCtx, tasks, channelRegistry, asyncPipeline, streamManager, step) if err != nil { errCh <- fmt.Errorf("failed to execute tasks: %w", err) return } - + // Mark tasks as completed and track last state allFailed := len(results) > 0 for _, result := range results { @@ -368,24 +370,24 @@ func (e *Engine) Run(ctx context.Context, input interface{}, mode types.StreamMo errCh <- fmt.Errorf("all %d tasks failed in step %d", len(results), step) return } - + // Apply writes to channels _, err = e.applyWrites(channelRegistry, results, triggers) if err != nil { errCh <- fmt.Errorf("failed to apply writes: %w", err) return } - + // Emit values event if values, err := channelRegistry.GetValues(); err == nil { streamManager.EmitValues(step, values) } - + // Save checkpoint based on durability mode if e.checkpointer != nil { checkpoint := channelRegistry.CreateCheckpoint() checkpointID := uuid.New().String() - + switch e.config.Durability { case types.DurabilitySync: // Synchronous save - block until complete @@ -398,7 +400,7 @@ func (e *Engine) Run(ctx context.Context, input interface{}, mode types.StreamMo go func(cp map[string]interface{}, cpID string, s int) { if err := e.saveCheckpoint(context.Background(), threadID, cpID, s, cp); err != nil { // Log async error but don't fail execution - log.Printf("async checkpoint save failed: %v", err) + common.Error("async checkpoint save failed", err, zap.String("thread_id", threadID), zap.String("checkpoint_id", cpID), zap.Int("step", s)) } }(checkpoint, checkpointID, step) case types.DurabilityExit: @@ -413,17 +415,17 @@ func (e *Engine) Run(ctx context.Context, input interface{}, mode types.StreamMo } } } - + step++ } - + // Get final state finalState, err := e.buildOutput(channelRegistry, lastState) if err != nil { errCh <- fmt.Errorf("failed to build output: %w", err) return } - + // Save deferred checkpoints for DurabilityExit mode if e.config.Durability == types.DurabilityExit { if err := e.saveDeferredCheckpoints(ctx); err != nil { @@ -431,11 +433,11 @@ func (e *Engine) Run(ctx context.Context, input interface{}, mode types.StreamMo return } } - + // Emit final event streamManager.EmitFinal(step, finalState) }() - + return outputCh, errCh } @@ -466,24 +468,24 @@ func (e *Engine) prepareNextTasksWithMode( ) ([]*Task, map[string]struct{}, error) { tasks := make([]*Task, 0) triggerToNodes := make(map[string]struct{}) - + // If this is the first step if len(completedTasks) == 0 { entryPoint := e.getEntryPoint() if entryPoint == "" { return nil, nil, fmt.Errorf("no entry point set") } - + // Handle direct edge Start → End (empty/trivial graph) if entryPoint == constants.End { return tasks, triggerToNodes, nil } - + node := e.getNode(entryPoint) if node == nil { return nil, nil, &errors.NodeNotFoundError{NodeName: entryPoint} } - + // Pass node Triggers as task Channels so the first task reads from // registered channels rather than receiving a nil state. triggers := e.getTriggers(node) @@ -492,26 +494,26 @@ func (e *Engine) prepareNextTasksWithMode( triggerToNodes["__start__"] = struct{}{} return tasks, triggerToNodes, nil } - + // AllPredecessor (DAG) mode: scan all uncompleted nodes and check if // ALL of their incoming-edge source nodes have completed. if e.graph.NodeTriggerMode == types.NodeTriggerAllPredecessor { return e.prepareNextTasksDAG(completedTasks, currentState, forExecution) } - + // AnyPredecessor (Pregel/BSP) mode: determine next nodes from the // last completed node's outgoing edges. nextNodes := e.getNextNodes(lastCompletedNode, currentState) - + for nodeName := range nextNodes { node := e.getNode(nodeName) if node == nil { continue } - + // Determine triggers for this node triggers := e.getTriggers(node) - + // BSP mode: always schedule, even if previously completed (supports loops). var task *Task if forExecution { @@ -520,13 +522,13 @@ func (e *Engine) prepareNextTasksWithMode( task = e.createTaskInfo(node, currentState, triggers, []string{}) } tasks = append(tasks, task) - + // Build trigger to nodes mapping for _, trigger := range triggers { triggerToNodes[trigger] = struct{}{} } } - + return tasks, triggerToNodes, nil } @@ -604,15 +606,15 @@ func (e *Engine) shouldInterrupt( triggerToNodes map[string]struct{}, ) []*Task { interrupted := make([]*Task, 0) - + // Check if any triggered node should interrupt if len(e.interrupts) == 0 { return interrupted } - + // Check if "*" is set (interrupt all) interruptAll := e.interrupts[types.All] - + for _, task := range tasks { shouldInterrupt := false if interruptAll { @@ -620,7 +622,7 @@ func (e *Engine) shouldInterrupt( } else { shouldInterrupt = e.interrupts[task.Name] } - + if shouldInterrupt { // Check if this task was triggered by a channel update triggered := false @@ -630,13 +632,13 @@ func (e *Engine) shouldInterrupt( break } } - + if triggered { interrupted = append(interrupted, task) } } } - + return interrupted } @@ -789,22 +791,22 @@ func (e *Engine) executeTasks( results := make([]*TaskResult, len(tasks)) var wg sync.WaitGroup var mu sync.Mutex - + for i, task := range tasks { wg.Add(1) go func(idx int, t *Task) { defer wg.Done() - + result := e.executeTask(ctx, t, registry) - + mu.Lock() results[idx] = result mu.Unlock() }(i, task) } - + wg.Wait() - + return results, nil } @@ -820,12 +822,12 @@ func (e *Engine) executeTasksAsync( results := make([]*TaskResult, len(tasks)) var wg sync.WaitGroup var mu sync.Mutex - + for i, task := range tasks { wg.Add(1) go func(idx int, t *Task) { defer wg.Done() - + // Read input for this task input, err := e.readTaskInput(registry, t) if err != nil { @@ -837,22 +839,22 @@ func (e *Engine) executeTasksAsync( mu.Unlock() return } - + // Define the function to execute executeFn := func(ctx context.Context) (interface{}, error) { return t.Func(ctx, input) } - + // Use task's retry policy or default retryPolicy := t.RetryPolicy if retryPolicy == nil { defaultPolicy := types.DefaultRetryPolicy() retryPolicy = &defaultPolicy } - + // Execute with async pipeline resultCh := asyncPipeline.ExecuteNode(ctx, t.Name, executeFn, &RetryConfig{Policy: retryPolicy}) - + // Wait for result select { case <-ctx.Done(): @@ -872,17 +874,17 @@ func (e *Engine) executeTasksAsync( mu.Unlock() return } - + // Convert async result to task result taskResult := &TaskResult{ Name: t.Name, Output: asyncResult.Output, Err: asyncResult.Err, } - + // Emit task end event streamManager.EmitTaskEnd(step, t.Name, t.ID, asyncResult.Output, asyncResult.Duration, asyncResult.Err) - + // Emit update event if successful if asyncResult.Err == nil { streamManager.EmitUpdate(step, t.Name, asyncResult.Output) @@ -890,14 +892,14 @@ func (e *Engine) executeTasksAsync( // Emit error event streamManager.EmitError(step, asyncResult.Err, t.Name) } - + mu.Lock() results[idx] = taskResult mu.Unlock() } }(i, task) } - + wg.Wait() return results, nil } @@ -916,21 +918,21 @@ func (e *Engine) executeTask( Err: fmt.Errorf("failed to read task input: %w", err), } } - + // Use RetryExecutor for retry logic retryPolicy := task.RetryPolicy if retryPolicy == nil { defaultPolicy := types.DefaultRetryPolicy() retryPolicy = &defaultPolicy } - + retryExecutor := NewRetryExecutor(retryPolicy) - + // Define the function to execute executeFn := func(ctx context.Context) (interface{}, error) { return task.Func(ctx, input) } - + // Execute with retry output, err := retryExecutor.Execute(ctx, task.Name, executeFn) if err != nil { @@ -954,7 +956,7 @@ func (e *Engine) executeTask( Err: err, } } - + // Success return &TaskResult{ Name: task.Name, @@ -968,7 +970,7 @@ func (e *Engine) readTaskInput(registry *channels.Registry, task *Task) (interfa if len(task.Channels) == 0 { return nil, nil } - + // Read values from specified channels values := make(map[string]interface{}) for _, channelName := range task.Channels { @@ -984,18 +986,18 @@ func (e *Engine) readTaskInput(registry *channels.Registry, task *Task) (interfa values[channelName] = value } } - + return values, nil } // Task represents a task to execute. type Task struct { - ID string - Name string - Func types.NodeFunc - Channels []string - Path []string - Triggers map[string]struct{} + ID string + Name string + Func types.NodeFunc + Channels []string + Path []string + Triggers map[string]struct{} RetryPolicy *types.RetryPolicy } @@ -1172,13 +1174,13 @@ func (e *Engine) applyInput(registry *channels.Registry, input interface{}) erro if err != nil { return err } - + // Apply each key to corresponding channel writes := make(map[string][]interface{}) for key, value := range inputMap { writes[key] = []interface{}{value} } - + return registry.UpdateChannels(writes) } @@ -1196,11 +1198,11 @@ func (e *Engine) buildOutput(registry *channels.Registry, lastState interface{}) if err != nil { return lastState, nil } - + if len(values) > 0 { return values, nil } - + return lastState, nil } @@ -1208,15 +1210,15 @@ func (e *Engine) mergeStates(existing, new interface{}) interface{} { if existing == nil { return new } - + if new == nil { return existing } - + // Try to merge maps existingMap, ok1 := existing.(map[string]interface{}) newMap, ok2 := new.(map[string]interface{}) - + if ok1 && ok2 { result := make(map[string]interface{}) for k, v := range existingMap { @@ -1227,7 +1229,7 @@ func (e *Engine) mergeStates(existing, new interface{}) interface{} { } return result } - + return new } @@ -1236,31 +1238,31 @@ func toMap(v interface{}) (map[string]interface{}, error) { if v == nil { return nil, fmt.Errorf("nil value") } - + // If it's already a map if m, ok := v.(map[string]interface{}); ok { return m, nil } - + // Use reflection to convert struct to map rv := reflect.ValueOf(v) if rv.Kind() == reflect.Ptr { rv = rv.Elem() } - + if rv.Kind() != reflect.Struct && rv.Kind() != reflect.Map { return map[string]interface{}{"__root__": v}, nil } - + result := make(map[string]interface{}) - + if rv.Kind() == reflect.Map { for _, key := range rv.MapKeys() { result[fmt.Sprintf("%v", key.Interface())] = rv.MapIndex(key).Interface() } return result, nil } - + // Struct rt := rv.Type() for i := 0; i < rv.NumField(); i++ { @@ -1270,12 +1272,12 @@ func toMap(v interface{}) (map[string]interface{}, error) { continue } value := rv.Field(i).Interface() - + // Convert field name to snake_case for consistency fieldName := toSnakeCase(field.Name) result[fieldName] = value } - + return result, nil } @@ -1299,7 +1301,7 @@ func (e *Engine) saveCheckpoint(ctx context.Context, threadID, checkpointID stri return e.checkpointer.Put(ctx, map[string]interface{}{ constants.ConfigKeyThreadID: threadID, constants.ConfigKeyCheckpointID: checkpointID, - "step": step, + "step": step, }, checkpoint) } @@ -1423,4 +1425,4 @@ func setNestedField(m map[string]interface{}, path string, val interface{}) { } } m[parts[len(parts)-1]] = val -} \ No newline at end of file +} diff --git a/internal/router/router.go b/internal/router/router.go index 2978c8b9dd..48e1b6e286 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -19,6 +19,7 @@ package router import ( "github.com/gin-gonic/gin" + "ragflow/internal/common" "ragflow/internal/handler" ) @@ -122,7 +123,7 @@ func (r *Router) Setup(engine *gin.Engine) { }) // Log all HTTP requests. - engine.Use(gin.Logger()) + engine.Use(common.GinLogger()) // Health check engine.GET("/health", r.systemHandler.Health) diff --git a/internal/server/config.go b/internal/server/config.go index 33cd3bf235..6c8a44b525 100644 --- a/internal/server/config.go +++ b/internal/server/config.go @@ -138,10 +138,31 @@ type DatabaseConfig struct { Charset string `mapstructure:"charset"` } -// LogConfig logging configuration +// LogConfig logging configuration. +// +// Path, MaxSize, MaxBackups, MaxAge, and Compress configure the rotated +// log file. The cmd/* entry points hardcode per-service defaults +// (e.g. "server_main.log" for the API server, "admin_server.log" for +// the admin server, "ingestion_server.log" for the ingestion worker), +// so a typical deployment gets a rotated file without any YAML +// configuration. When Path is empty (the default) the binary's +// hardcoded default filename is used — it does NOT disable file +// output. Set log.path in service_conf.yaml to override the +// per-service default filename. +// +// Compress is a pointer so callers can distinguish "not set" (nil, +// defaults to true) from "explicitly false" (*bool=false). All other +// numeric fields use plain int because their zero values are sensible +// defaults (100 MB / 10 files / 30 days) and there is no operator-meaningful +// reason to distinguish "not set" from "0". type LogConfig struct { - Level string `mapstructure:"level"` // debug, info, warn, error - Format string `mapstructure:"format"` // json, text + Level string `mapstructure:"level"` // debug, info, warn, error + Format string `mapstructure:"format"` // json, text (reserved for future use) + Path string `mapstructure:"path"` // per-binary file override; empty = use cmd/* hardcoded default + MaxSize int `mapstructure:"max_size"` // MB before rotation; default 100 + MaxBackups int `mapstructure:"max_backups"` // retained rotated files; default 10 + MaxAge int `mapstructure:"max_age"` // days; default 30 + Compress *bool `mapstructure:"compress"` // gzip rotated files; nil = default true } // DocEngineConfig document engine configuration diff --git a/internal/service/agent.go b/internal/service/agent.go index dfd499dd53..42a12b9d2c 100644 --- a/internal/service/agent.go +++ b/internal/service/agent.go @@ -21,13 +21,13 @@ import ( "encoding/json" "errors" "fmt" - "log" "strings" "sync" "time" "github.com/cloudwego/eino/compose" "github.com/google/uuid" + "go.uber.org/zap" "gorm.io/gorm" "ragflow/internal/agent/canvas" @@ -680,13 +680,20 @@ func (s *AgentService) RunAgent(ctx context.Context, userID, canvasID, sessionID if tenantIDs, terr := s.userTenantDAO.GetTenantIDsByUserID(userID); terr == nil && len(tenantIDs) > 0 { root["tenant_id"] = tenantIDs[0] } else if terr != nil { - log.Printf("service: RunAgent userTenantDAO.GetTenantIDsByUserID(%q): %v (best-effort, run not blocked)", userID, terr) + common.Warn("service: RunAgent userTenantDAO.GetTenantIDsByUserID (best-effort, run not blocked)", + zap.String("user_id", userID), + zap.Error(terr)) } // v3.6.1 diagnostic: log what RunAgent put into root so we can // confirm tenant_id / user_id / session_id / user_input all // reached the buildRunFunc closure (which runs in the runner's // goroutine, possibly after a context switch). - log.Printf("DEBUG RunAgent root canvasID=%q userID=%q sessionID=%q tenantID=%v userInput_len=%d", canvasID, userID, sessionID, root["tenant_id"], func() int { s, _ := root["user_input"].(string); return len(s) }()) + common.Debug("RunAgent root", + zap.String("canvasID", canvasID), + zap.String("userID", userID), + zap.String("sessionID", sessionID), + zap.Any("tenantID", root["tenant_id"]), + zap.Int("userInput_len", func() int { s, _ := root["user_input"].(string); return len(s) }())) return s.runner.Run(ctx, run, canvasID, sessionID, userInput, root), nil } @@ -865,10 +872,13 @@ func (s *AgentService) buildRunFunc(canvasID string, versionRow *entity.UserCanv cc, err = canvas.Compile(ctx2, c) } if err != nil { - log.Printf( - "DEBUG RunAgent compile err canvas=%q session=%q task=%q run=%q: %T: %v", - canvasID, sessionID, taskID, runID, err, err, - ) + common.Debug("RunAgent compile err", + zap.String("canvas", canvasID), + zap.String("session", sessionID), + zap.String("task", taskID), + zap.String("run", runID), + zap.String("type", fmt.Sprintf("%T", err)), + zap.Error(err)) s.markRunFailed(ctx2, runID, "compile: "+err.Error()) return nil, fmt.Errorf("canvas compile: %w: %w", ErrAgentStorageError, err) } @@ -929,10 +939,13 @@ func (s *AgentService) buildRunFunc(canvasID string, versionRow *entity.UserCanv } if err != nil { - log.Printf( - "DEBUG RunAgent invoke err canvas=%q session=%q task=%q run=%q: %T: %v", - canvasID, sessionID, taskID, runID, err, err, - ) + common.Debug("RunAgent invoke err", + zap.String("canvas", canvasID), + zap.String("session", sessionID), + zap.String("task", taskID), + zap.String("run", runID), + zap.String("type", fmt.Sprintf("%T", err)), + zap.Error(err)) if canvas.IsInterruptError(err) { s.markRunFailed(ctx2, runID, "interrupt: "+err.Error()) if answer != "" { @@ -1040,7 +1053,9 @@ func (s *AgentService) markRunSucceeded(ctx context.Context, runID string) { return } if err := s.runTracker.MarkSucceeded(ctx, runID); err != nil { - log.Printf("service: RunAgent runTracker.MarkSucceeded(%q): %v (best-effort, run not blocked)", runID, err) + common.Warn("service: RunAgent runTracker.MarkSucceeded (best-effort, run not blocked)", + zap.String("run_id", runID), + zap.Error(err)) } } @@ -1052,7 +1067,10 @@ func (s *AgentService) markRunFailed(ctx context.Context, runID, reason string) return } if err := s.runTracker.MarkFailed(ctx, runID, reason); err != nil { - log.Printf("service: RunAgent runTracker.MarkFailed(%q, %q): %v (best-effort, run not blocked)", runID, reason, err) + common.Warn("service: RunAgent runTracker.MarkFailed (best-effort, run not blocked)", + zap.String("run_id", runID), + zap.String("reason", reason), + zap.Error(err)) } } diff --git a/internal/tokenizer/tokenizer_concurrent_test.go b/internal/tokenizer/tokenizer_concurrent_test.go index 0fec34dc78..877da7d30c 100644 --- a/internal/tokenizer/tokenizer_concurrent_test.go +++ b/internal/tokenizer/tokenizer_concurrent_test.go @@ -30,7 +30,7 @@ import ( func init() { // Initialize logger for tests - if err := common.Init("info", ""); err != nil { + if err := common.Init("info", common.FileOutput{}); err != nil { fmt.Printf("Failed to initialize logger: %v\n", err) } }