Files
ragflow/internal/service/canvas_decode_test.go
Zhichang Yu 3f805a64f1 feat(agent): align Go agent behavior with Python (except retrieval component) (#16225)
## Summary

Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.

The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.

## Scope of alignment

### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.

### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.

### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.

### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.

### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.

## Out of scope (intentionally)

- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.

## How alignment is verified

`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle

`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).

`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.

```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```

## Design reference

`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-22 11:58:29 +08:00

329 lines
11 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Tests for the Phase 4.4 V2 canvas DSL decoder.
//
// Each test pins one branch of decodeCanvasFromDSL:
// - empty DSL → ErrAgentStorageError (no JSON-round-trip happens)
// - non-empty DSL with one component → clean *canvas.Canvas
// - non-empty DSL with multiple components → all preserved
//
// All failures MUST carry ErrAgentStorageError so the handler-side
// mapAgentError classifies them as CodeServerError (500) with the
// sanitized message (no raw decoder-string leak). This is the
// regression pin that closes the v3.5.2 review's "raw decode error
// leak" concern at the Compile/Invoke boundary.
package service
import (
"errors"
"testing"
agenttool "ragflow/internal/agent/tool"
)
// TestDecodeCanvasFromDSL_EmptyDSL pins the "empty DSL" branch:
// decodeCanvasFromDSL(nil) must return an error wrapping
// ErrAgentStorageError without attempting json.Marshal (the empty
// map serialises to "{}" which would decode to an empty Canvas,
// bypassing the "no components" check).
func TestDecodeCanvasFromDSL_EmptyDSL(t *testing.T) {
t.Parallel()
_, err := decodeCanvasFromDSL(nil)
if err == nil {
t.Fatal("expected error for empty DSL")
}
if !errors.Is(err, ErrAgentStorageError) {
t.Errorf("expected ErrAgentStorageError in chain, got %v", err)
}
}
// TestDecodeCanvasFromDSL_EmptyMapSameAsNil pins the equivalent
// branch for an explicitly-empty (but non-nil) map. Same outcome
// as the nil case — the function must reject before marshalling.
func TestDecodeCanvasFromDSL_EmptyMapSameAsNil(t *testing.T) {
t.Parallel()
_, err := decodeCanvasFromDSL(map[string]any{})
if err == nil {
t.Fatal("expected error for empty DSL map")
}
if !errors.Is(err, ErrAgentStorageError) {
t.Errorf("expected ErrAgentStorageError in chain, got %v", err)
}
}
// TestDecodeCanvasFromDSL_SingleComponent pins the happy-path:
// a DSL with one Begin component decodes to a *canvas.Canvas with
// that component under its id key.
func TestDecodeCanvasFromDSL_SingleComponent(t *testing.T) {
t.Parallel()
dsl := map[string]any{
"components": map[string]any{
"begin_0": map[string]any{
"obj": map[string]any{
"component_name": "Begin",
"params": map[string]any{},
},
"downstream": []any{"message_0"},
"upstream": []any{},
},
},
"path": []any{"begin_0"},
}
c, err := decodeCanvasFromDSL(dsl)
if err != nil {
t.Fatalf("decodeCanvasFromDSL: %v", err)
}
if c == nil {
t.Fatal("decoded Canvas is nil")
}
if len(c.Components) != 1 {
t.Errorf("Components length = %d, want 1", len(c.Components))
}
if _, ok := c.Components["begin_0"]; !ok {
t.Errorf("Components missing begin_0 key")
}
if len(c.Path) != 1 || c.Path[0] != "begin_0" {
t.Errorf("Path = %v, want [begin_0]", c.Path)
}
}
// TestDecodeCanvasFromDSL_MultiComponent pins the multi-node
// happy-path: Begin → Retrieval → Message → Answer decodes into
// 4 components with the correct Downstream / Upstream wiring.
// This is the shape real v1 DSLs use.
func TestDecodeCanvasFromDSL_MultiComponent(t *testing.T) {
t.Parallel()
dsl := map[string]any{
"components": map[string]any{
"begin_0": map[string]any{
"obj": map[string]any{
"component_name": "Begin",
"params": map[string]any{},
},
"downstream": []any{"retrieval_0"},
},
"retrieval_0": map[string]any{
"obj": map[string]any{
"component_name": "Retrieval",
"params": map[string]any{"kb_ids": []any{"kb-1"}},
},
"downstream": []any{"message_0"},
"upstream": []any{"begin_0"},
},
"message_0": map[string]any{
"obj": map[string]any{
"component_name": "Message",
"params": map[string]any{"text": "hi {{begin_0@result}}"},
},
"upstream": []any{"retrieval_0"},
},
},
"path": []any{"begin_0", "retrieval_0", "message_0"},
}
c, err := decodeCanvasFromDSL(dsl)
if err != nil {
t.Fatalf("decodeCanvasFromDSL: %v", err)
}
if len(c.Components) != 3 {
t.Errorf("Components length = %d, want 3", len(c.Components))
}
for _, id := range []string{"begin_0", "retrieval_0", "message_0"} {
if _, ok := c.Components[id]; !ok {
t.Errorf("Components missing %s", id)
}
}
if got := c.Components["retrieval_0"].Obj.ComponentName; got != "Retrieval" {
t.Errorf("retrieval_0 component_name = %q, want Retrieval", got)
}
}
// TestDecodeCanvasFromDSL_NoComponents pins the "non-empty map
// with no components" branch: a DSL whose top-level keys are
// non-component (e.g. only "globals") must still fail because the
// resulting Canvas would have an empty Components map. The error
// must chain ErrAgentStorageError so mapAgentError -> 500.
func TestDecodeCanvasFromDSL_NoComponents(t *testing.T) {
t.Parallel()
dsl := map[string]any{
"globals": map[string]any{
"sys.query": "",
},
}
_, err := decodeCanvasFromDSL(dsl)
if err == nil {
t.Fatal("expected error when DSL has no components")
}
if !errors.Is(err, ErrAgentStorageError) {
t.Errorf("expected ErrAgentStorageError in chain, got %v", err)
}
}
// TestDecodeCanvasFromDSL_GlobalsPreserved pins that round-tripping
// the DSL through JSON does not lose top-level metadata keys
// (globals, history, retrieval). These are needed at compile time
// for state-pre / state-post handlers to resolve references.
func TestDecodeCanvasFromDSL_GlobalsPreserved(t *testing.T) {
t.Parallel()
dsl := map[string]any{
"components": map[string]any{
"begin_0": map[string]any{
"obj": map[string]any{
"component_name": "Begin",
"params": map[string]any{},
},
"downstream": []any{},
},
},
"globals": map[string]any{
"sys.query": "hello",
"sys.user_id": "user-1",
"env.counter": 0.0,
"env.loop_done": false,
"env.sample_rows": []any{},
},
}
c, err := decodeCanvasFromDSL(dsl)
if err != nil {
t.Fatalf("decodeCanvasFromDSL: %v", err)
}
if c.Globals == nil {
t.Fatal("Globals is nil after round-trip")
}
if got, _ := c.Globals["sys.query"].(string); got != "hello" {
t.Errorf("Globals[sys.query] = %v, want \"hello\"", c.Globals["sys.query"])
}
if got, _ := c.Globals["sys.user_id"].(string); got != "user-1" {
t.Errorf("Globals[sys.user_id] = %v, want \"user-1\"", c.Globals["sys.user_id"])
}
}
func TestDecodeCanvasFromDSL_PreservesNodeParents(t *testing.T) {
t.Parallel()
dsl := map[string]any{
"graph": map[string]any{
"nodes": []any{
map[string]any{"id": "Iteration:IterateList"},
map[string]any{"id": "IterationItem:IterStart", "parentId": "Iteration:IterateList"},
map[string]any{"id": "StringTransform:FmtItem", "parentId": "Iteration:IterateList"},
map[string]any{"id": "Message:IterDone"},
},
},
"components": map[string]any{
"Iteration:IterateList": map[string]any{
"obj": map[string]any{
"component_name": "Parallel",
"params": map[string]any{"items_ref": "sys.items"},
},
"downstream": []any{"Message:IterDone"},
},
"IterationItem:IterStart": map[string]any{
"obj": map[string]any{
"component_name": "IterationItem",
"params": map[string]any{},
},
"downstream": []any{"StringTransform:FmtItem"},
},
"StringTransform:FmtItem": map[string]any{
"obj": map[string]any{
"component_name": "StringTransform",
"params": map[string]any{"method": "merge", "script": "{item}", "delimiters": []any{"|"}},
},
},
"Message:IterDone": map[string]any{
"obj": map[string]any{
"component_name": "Message",
"params": map[string]any{"content": []any{"done"}},
},
},
},
}
c, err := decodeCanvasFromDSL(dsl)
if err != nil {
t.Fatalf("decodeCanvasFromDSL: %v", err)
}
if c.NodeParents["IterationItem:IterStart"] != "Iteration:IterateList" {
t.Fatalf("IterationItem parent = %q, want Iteration:IterateList", c.NodeParents["IterationItem:IterStart"])
}
if c.NodeParents["StringTransform:FmtItem"] != "Iteration:IterateList" {
t.Fatalf("FmtItem parent = %q, want Iteration:IterateList", c.NodeParents["StringTransform:FmtItem"])
}
if _, ok := c.NodeParents["Message:IterDone"]; ok {
t.Fatalf("outer follower Message:IterDone should not be marked as a grouped child")
}
}
// TestNewAgentServiceWithOptions_NilOptions pins that the new
// constructor accepts all-nil options and produces a usable
// service (no panic on field init, no nil-deref when running
// without Redis). Existing test sites rely on this — the
// zero-arg NewAgentService() now just delegates here.
func TestNewAgentServiceWithOptions_NilOptions(t *testing.T) {
t.Parallel()
svc := NewAgentServiceWithOptions(nil, nil, nil)
if svc == nil {
t.Fatal("NewAgentServiceWithOptions returned nil")
}
if svc.runner == nil {
t.Error("runner is nil after construction")
}
if svc.checkpointStore != nil {
t.Error("checkpointStore should be nil when constructed with nil")
}
if svc.stateSerializer != nil {
t.Error("stateSerializer should be nil when constructed with nil")
}
if svc.runTracker != nil {
t.Error("runTracker should be nil when constructed with nil")
}
if svc.canvasDAO == nil || svc.versionDAO == nil {
t.Error("DAOs should be non-nil after construction")
}
}
// TestNewAgentService_DefaultsToNilOptions pins that the legacy
// zero-arg NewAgentService() is functionally equivalent to
// NewAgentServiceWithOptions(nil, nil, nil) — the field values
// must match exactly so existing call sites don't observe a
// behavioural change.
func TestNewAgentService_DefaultsToNilOptions(t *testing.T) {
t.Parallel()
a := NewAgentService()
b := NewAgentServiceWithOptions(nil, nil, nil)
if a.checkpointStore != b.checkpointStore {
t.Errorf("checkpointStore mismatch: %v vs %v", a.checkpointStore, b.checkpointStore)
}
if a.stateSerializer != b.stateSerializer {
t.Errorf("stateSerializer mismatch: %v vs %v", a.stateSerializer, b.stateSerializer)
}
if a.runTracker != b.runTracker {
t.Errorf("runTracker mismatch: %v vs %v", a.runTracker, b.runTracker)
}
}
func TestNewAgentService_RegistersSandboxClient(t *testing.T) {
t.Parallel()
agenttool.SetSandboxClient(nil)
t.Cleanup(func() { agenttool.SetSandboxClient(nil) })
_ = NewAgentService()
if stub, ok := agenttool.GetSandboxClient().(interface{ IsStubSandboxClient() bool }); ok && stub.IsStubSandboxClient() {
t.Fatal("sandbox client remained stub after NewAgentService boot wiring")
}
}