Files
ragflow/internal/agent/component/variable_aggregator.go
Zhichang Yu 3f805a64f1 feat(agent): align Go agent behavior with Python (except retrieval component) (#16225)
## Summary

Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.

The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.

## Scope of alignment

### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.

### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.

### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.

### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.

### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.

## Out of scope (intentionally)

- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.

## How alignment is verified

`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle

`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).

`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.

```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```

## Design reference

`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-22 11:58:29 +08:00

294 lines
9.2 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Package component — VariableAggregator (T3, plan §2.11.3 row 19).
//
// For each "group" in its param, VariableAggregator walks a list of
// variable selectors and picks the first one whose resolved value is
// truthy. The picked value is exposed at outputs[<group_name>].
//
// Mirrors agent/component/variable_aggregator.py. The Python implementation
// also records each group's variables list under the synthetic input key
// "<group_name>.variables" for engine bookkeeping; the Go port skips that
// side-effect because the canvas engine consumes param data directly via
// the component factory, not via a re-emitted inputs map.
package component
import (
"context"
"fmt"
"ragflow/internal/agent/runtime"
)
const componentNameVariableAggregator = "VariableAggregator"
// variableAggregatorParam is the static configuration loaded from the DSL.
// It mirrors the Python VariableAggregatorParam surface.
type variableAggregatorParam struct {
// Groups is a list of {group_name, variables} dicts. Each
// group.variables entry is itself a {value: <ref-string>} dict.
Groups []map[string]any `json:"groups"`
}
// Update copies a fresh param map into the receiver. Mirrors the Python
// ComponentParamBase contract.
//
// `groups` may arrive as either []any (engine-decoded from JSON) or
// []map[string]any (test/direct construction); both shapes are accepted
// so callers don't have to coerce.
func (p *variableAggregatorParam) Update(conf map[string]any) error {
if conf == nil {
p.Groups = nil
return nil
}
rawGroups, ok := conf["groups"]
if !ok {
p.Groups = nil
return nil
}
var groupsList []any
switch x := rawGroups.(type) {
case []any:
groupsList = x
case []map[string]any:
groupsList = make([]any, 0, len(x))
for _, g := range x {
groupsList = append(groupsList, g)
}
default:
return &ParamError{Field: "groups", Reason: "must be a list"}
}
out := make([]map[string]any, 0, len(groupsList))
for i, raw := range groupsList {
g, ok := raw.(map[string]any)
if !ok {
return &ParamError{Field: fmt.Sprintf("groups[%d]", i), Reason: "must be a map"}
}
out = append(out, g)
}
p.Groups = out
return nil
}
// Check performs shallow validation. Mirrors VariableAggregatorParam.check.
func (p *variableAggregatorParam) Check() error {
if len(p.Groups) == 0 {
return &ParamError{Field: "groups", Reason: "must not be empty"}
}
for i, g := range p.Groups {
name, _ := g["group_name"].(string)
if name == "" {
return &ParamError{Field: fmt.Sprintf("groups[%d].group_name", i), Reason: "must not be empty"}
}
vars, ok := g["variables"]
if !ok {
return &ParamError{Field: fmt.Sprintf("groups[%d].variables", i), Reason: "must be a list"}
}
switch vars.(type) {
case []any, []map[string]any:
// accept both shapes
default:
return &ParamError{Field: fmt.Sprintf("groups[%d].variables", i), Reason: "must be a list"}
}
}
return nil
}
// AsDict returns the params as a plain map.
func (p *variableAggregatorParam) AsDict() map[string]any {
out := map[string]any{"groups": make([]any, 0, len(p.Groups))}
for _, g := range p.Groups {
out["groups"] = append(out["groups"].([]any), g)
}
return out
}
// VariableAggregatorComponent walks each group's selectors and emits
// outputs[group_name] = first non-empty resolved value.
type VariableAggregatorComponent struct {
name string
param variableAggregatorParam
}
// NewVariableAggregatorComponent constructs a VariableAggregator from
// the DSL param map. The param is validated via Check(); a check failure
// is returned to the caller so the engine can surface a clean error.
func NewVariableAggregatorComponent(params map[string]any) (Component, error) {
p := &variableAggregatorParam{}
if err := p.Update(params); err != nil {
return nil, fmt.Errorf("VariableAggregator: param update: %w", err)
}
if err := p.Check(); err != nil {
return nil, fmt.Errorf("VariableAggregator: param check: %w", err)
}
return &VariableAggregatorComponent{
name: componentNameVariableAggregator,
param: *p,
}, nil
}
// Name returns the registered component name.
func (v *VariableAggregatorComponent) Name() string { return v.name }
// Invoke iterates the configured groups and resolves each selector's
// value against the canvas state. The first truthy value in a group
// wins; outputs[group_name] is set to that value. Groups with no truthy
// selector produce no output key.
//
// Variable references may be passed in two ways:
// - static via param.groups[i].variables[j].value
// - runtime via inputs["variables"] (a list of selector dicts that
// REPLACES the static config for the duration of this call).
//
// The runtime override matches the Python component's get_input_form
// contract: the engine is allowed to pass the resolved variable list
// per-invocation. When inputs["variables"] is absent the static param
// config is used unchanged.
func (v *VariableAggregatorComponent) Invoke(ctx context.Context, inputs map[string]any) (map[string]any, error) {
state, _, err := runtime.GetStateFromContext[*runtime.CanvasState](ctx)
if err != nil {
return nil, fmt.Errorf("VariableAggregator: %w", err)
}
if state == nil {
return nil, fmt.Errorf("VariableAggregator: nil canvas state")
}
groups := v.param.Groups
// Optional runtime override: the engine can pass a fresh
// "variables" list (a list of group dicts) that replaces the
// static param. We accept either shape — a bare list of selectors
// replaces the FIRST group's variables, or a list of group dicts
// replaces all groups entirely. The latter is the common case
// because the engine passes one item per group.
if override, ok := inputs["variables"].([]any); ok && len(override) > 0 {
if first, ok := override[0].(map[string]any); ok {
if _, hasGroups := first["groups"]; hasGroups {
// shape: [{groups: [...]}] — flatten outer wrapper
groups = make([]map[string]any, 0, len(override))
for _, raw := range override {
if m, ok := raw.(map[string]any); ok {
if gs, ok := m["groups"].([]any); ok {
for _, g := range gs {
if gm, ok := g.(map[string]any); ok {
groups = append(groups, gm)
}
}
}
}
}
} else {
// treat override as a list of group dicts
groups = make([]map[string]any, 0, len(override))
for _, g := range override {
if gm, ok := g.(map[string]any); ok {
groups = append(groups, gm)
}
}
}
}
}
out := make(map[string]any, len(groups))
for _, g := range groups {
gname, _ := g["group_name"].(string)
if gname == "" {
continue
}
selectors, _ := g["variables"].([]any)
for _, raw := range selectors {
sel, ok := raw.(map[string]any)
if !ok {
continue
}
ref, _ := sel["value"].(string)
if ref == "" {
continue
}
val, err := state.GetVar(ref)
if err != nil || !isTruthy(val) {
continue
}
out[gname] = val
break
}
}
return out, nil
}
// Stream mirrors Invoke; VariableAggregator is a single-shot reduce.
func (v *VariableAggregatorComponent) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
out, err := v.Invoke(ctx, inputs)
if err != nil {
return nil, err
}
ch := make(chan map[string]any, 1)
ch <- out
close(ch)
return ch, nil
}
// Inputs returns the public parameter surface. The "variables" key
// accepts a runtime override of the per-group variable list (matching
// the Python get_input_form contract).
func (v *VariableAggregatorComponent) Inputs() map[string]string {
return map[string]string{
"variables": "Optional runtime override of the per-group variable selector list.",
"groups": "Optional runtime override of the aggregator group configuration: [{group_name, variables}].",
}
}
// Outputs returns one key per configured group: <group_name> = first
// non-empty resolved value for that group.
func (v *VariableAggregatorComponent) Outputs() map[string]string {
out := make(map[string]string, len(v.param.Groups))
for _, g := range v.param.Groups {
if name, _ := g["group_name"].(string); name != "" {
out[name] = "First non-empty resolved value among the group's selectors."
}
}
return out
}
// isTruthy mirrors Python's bool() coercion: nil is false, empty
// strings/slices/maps are false, zero numbers are false, false is
// false, everything else is true.
func isTruthy(v any) bool {
switch x := v.(type) {
case nil:
return false
case bool:
return x
case string:
return x != ""
case []any:
return len(x) > 0
case map[string]any:
return len(x) > 0
case int:
return x != 0
case int64:
return x != 0
case float64:
return x != 0
}
return true
}
func init() {
Register(componentNameVariableAggregator, NewVariableAggregatorComponent)
}