Files
ragflow/internal/service/canvas_decode.go
Zhichang Yu 3f805a64f1 feat(agent): align Go agent behavior with Python (except retrieval component) (#16225)
## Summary

Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.

The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.

## Scope of alignment

### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.

### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.

### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.

### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.

### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.

## Out of scope (intentionally)

- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.

## How alignment is verified

`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle

`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).

`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.

```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```

## Design reference

`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-22 11:58:29 +08:00

181 lines
5.8 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package service
import (
"fmt"
"ragflow/internal/agent/canvas"
)
// decodeCanvasFromDSL converts the DSL map (in either IMPORT shape
// or NormalizeForCanvas output shape) into a *canvas.Canvas that
// canvas.Compile accepts.
//
// Two accepted input shapes:
//
// 1. IMPORT shape (top-level "obj.component_name" + "obj.params"
// + outer "downstream" / "upstream"). The Python-era DSL
// convention; some legacy v1 fixtures still use it directly.
//
// 2. Normalized shape (top-level "name" + "params" + "downstream" /
// "upstream"). The output of dslpkg.NormalizeForCanvas, which
// is what service.normalisedDSLForRun currently feeds into
// buildRunFunc (gap analysis §11.7.4 V2 follow-up chain).
//
// The canvas.Canvas struct itself uses IMPORT shape
// (CanvasComponentObj.ComponentName with json tag
// "component_name"). normalize.go's buildGraphFromComponents
// flattens the components map to the normalized shape so the
// React-Flow editor gets a stable byte-equal layout; the runtime
// then has to walk both shapes here.
//
// All non-sentinel failures wrap ErrAgentStorageError so the
// handler's mapAgentError classifies them as CodeServerError
// (500) with a sanitized message — the raw decoder error string
// never reaches the client.
//
// Decoder strategy: direct map walking, NOT JSON round-trip. The
// Phase 4.4 V2 plan §4.3 originally specified JSON round-trip
// for the IMPORT shape, but the production path goes through
// NormalizeForCanvas first (normalized shape), and round-tripping
// the normalized shape through JSON loses the `name` →
// `obj.component_name` mapping (json.Unmarshal into Canvas does
// not coerce flat keys into nested `obj`). Direct map walking
// handles both shapes without that hazard.
func decodeCanvasFromDSL(dsl map[string]any) (*canvas.Canvas, error) {
if len(dsl) == 0 {
return nil, fmt.Errorf("decode canvas: empty DSL: %w", ErrAgentStorageError)
}
rawComps, ok := dsl["components"].(map[string]any)
if !ok || len(rawComps) == 0 {
return nil, fmt.Errorf("decode canvas: no components: %w", ErrAgentStorageError)
}
c := &canvas.Canvas{
Components: make(map[string]canvas.CanvasComponent, len(rawComps)),
NodeParents: make(map[string]string),
}
if p, ok := dsl["path"].([]any); ok {
c.Path = make([]string, 0, len(p))
for _, v := range p {
if s, ok := v.(string); ok {
c.Path = append(c.Path, s)
}
}
}
if p, ok := dsl["globals"].(map[string]any); ok {
c.Globals = p
}
if graph, ok := dsl["graph"].(map[string]any); ok {
if nodes, ok := graph["nodes"].([]any); ok {
for _, raw := range nodes {
node, ok := raw.(map[string]any)
if !ok || node == nil {
continue
}
id, _ := node["id"].(string)
parentID, _ := node["parentId"].(string)
if id != "" && parentID != "" {
c.NodeParents[id] = parentID
}
}
}
}
for cpnID, raw := range rawComps {
comp, ok := raw.(map[string]any)
if !ok {
continue
}
name, params, downstream, upstream := extractComponentFields(comp)
if name == "" {
return nil, fmt.Errorf("decode canvas: component %q has empty component_name: %w", cpnID, ErrAgentStorageError)
}
c.Components[cpnID] = canvas.CanvasComponent{
Obj: canvas.CanvasComponentObj{
ComponentName: name,
Params: params,
},
Downstream: downstream,
Upstream: upstream,
}
}
if len(c.Components) == 0 {
return nil, fmt.Errorf("decode canvas: no components: %w", ErrAgentStorageError)
}
return c, nil
}
// extractComponentFields reads (name, params, downstream, upstream)
// from a single component map. Accepts both IMPORT shape
// (obj.component_name / obj.params / outer downstream) and
// normalized shape (flat name / flat params / flat downstream).
//
// IMPORT shape preference order: obj.* (canonical), then flat
// fallback (for normalized input). This matches the way
// dsl.extractComponent (normalize.go:269) walks both, ensuring the
// V2 decoder agrees with the normalizer on the field-resolution
// order for shared key names.
func extractComponentFields(comp map[string]any) (name string, params map[string]any, downstream []string, upstream []string) {
if obj, ok := comp["obj"].(map[string]any); ok {
name, _ = obj["component_name"].(string)
if p, ok := obj["params"].(map[string]any); ok {
params = p
}
if ds, ok := obj["downstream"].([]any); ok {
downstream = toStringSlice(ds)
} else if ds, ok := obj["downstream"].([]string); ok {
downstream = ds
}
}
if name == "" {
name, _ = comp["name"].(string)
}
if params == nil {
if p, ok := comp["params"].(map[string]any); ok {
params = p
}
}
if len(downstream) == 0 {
if ds, ok := comp["downstream"].([]any); ok {
downstream = toStringSlice(ds)
} else if ds, ok := comp["downstream"].([]string); ok {
downstream = ds
}
}
if us, ok := comp["upstream"].([]any); ok {
upstream = toStringSlice(us)
} else if us, ok := comp["upstream"].([]string); ok {
upstream = us
}
return
}
// toStringSlice normalises a []any of strings to []string. Empty
// for nil input.
func toStringSlice(in []any) []string {
if len(in) == 0 {
return nil
}
out := make([]string, 0, len(in))
for _, v := range in {
if s, ok := v.(string); ok {
out = append(out, s)
}
}
return out
}