Files
ragflow/internal/agent/dsl/normalize_test.go
Zhichang Yu 3f805a64f1 feat(agent): align Go agent behavior with Python (except retrieval component) (#16225)
## Summary

Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.

The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.

## Scope of alignment

### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.

### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.

### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.

### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.

### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.

## Out of scope (intentionally)

- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.

## How alignment is verified

`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle

`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).

`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.

```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```

## Design reference

`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-22 11:58:29 +08:00

706 lines
22 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dsl
import (
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"
)
// fixturesDir resolves the testdata/ directory for the dsl package. The
// production fixtures live at internal/agent/dsl/testdata/ (after the
// v1_examples/ flatten). Helpers and tests below walk that directory
// directly so a new fixture is picked up automatically.
func fixturesDir(t *testing.T) string {
t.Helper()
return filepath.Join("testdata")
}
// loadFixture reads a JSON file from internal/agent/dsl/testdata into a
// map[string]any. The function t.Skip()s (not t.Fatal) on a missing
// file so a test run on a slim checkout still goes green.
func loadFixture(t *testing.T, name string) map[string]any {
t.Helper()
raw, err := os.ReadFile(filepath.Join(fixturesDir(t), name))
if err != nil {
t.Skipf("fixture %s not readable: %v", name, err)
}
var m map[string]any
if err := json.Unmarshal(raw, &m); err != nil {
t.Fatalf("[%s] parse: %v", name, err)
}
return m
}
// TestNormalize_NoopWhenGraphPresent guards against accidentally
// clobbering a payload the front-end just saved. Any input with a
// non-empty `graph.nodes` must round-trip with `graph` untouched.
func TestNormalize_NoopWhenGraphPresent(t *testing.T) {
in := map[string]any{
"graph": map[string]any{
"nodes": []any{
map[string]any{"id": "begin", "type": "beginNode"},
},
"edges": []any{},
},
"components": map[string]any{},
}
out := NormalizeForCanvas(in)
got, _ := out["graph"].(map[string]any)
if got == nil {
t.Fatal("graph block disappeared")
}
if nodes, _ := got["nodes"].([]any); len(nodes) != 1 {
t.Errorf("graph.nodes length = %d, want 1", len(nodes))
}
}
// TestNormalize_BuildsGraphFromComponents verifies the
// components → graph derivation path. Given a populated `components`
// block but no `graph`, the function should produce a deterministic
// graph with one node per component and one edge per downstream
// declaration. The graph node's `data.form` must always be present
// (even when empty) so the front-end's React Flow shape is stable.
func TestNormalize_BuildsGraphFromComponents(t *testing.T) {
in := map[string]any{
"components": map[string]any{
"begin": map[string]any{
"obj": map[string]any{"component_name": "Begin", "params": map[string]any{}},
"downstream": []any{"llm:0"},
},
"llm:0": map[string]any{
"obj": map[string]any{"component_name": "LLM", "params": map[string]any{"k": "v"}},
"downstream": []any{},
},
},
}
out := NormalizeForCanvas(in)
graph, _ := out["graph"].(map[string]any)
if graph == nil {
t.Fatal("graph not derived from components")
}
nodes, _ := graph["nodes"].([]any)
if len(nodes) != 2 {
t.Fatalf("graph.nodes length = %d, want 2", len(nodes))
}
edges, _ := graph["edges"].([]any)
if len(edges) != 1 {
t.Fatalf("graph.edges length = %d, want 1", len(edges))
}
// Each derived node must carry data.form, even when the source
// params is empty — this is the front-end's React-Flow invariant.
for _, n := range nodes {
nm, _ := n.(map[string]any)
if nm == nil {
continue
}
data, _ := nm["data"].(map[string]any)
if data == nil {
t.Errorf("node %v missing data block", nm["id"])
continue
}
if _, ok := data["form"]; !ok {
t.Errorf("node %v data.form missing", nm["id"])
}
}
}
// TestNormalize_DeterministicNodeOrder guards against the Go runtime
// map iteration lottery producing a different graph layout on
// successive normalize passes. The function must sort the component
// ids before iterating so the layout (x = 50 + i*350) is a stable
// function of the input dsl.
func TestNormalize_DeterministicNodeOrder(t *testing.T) {
comps := map[string]any{}
for _, k := range []string{"z", "a", "m", "b"} {
comps[k] = map[string]any{
"obj": map[string]any{"component_name": "Begin", "params": map[string]any{}},
"downstream": []any{},
}
}
first := NormalizeForCanvas(map[string]any{"components": comps})
second := NormalizeForCanvas(map[string]any{"components": comps})
idsFirst := nodeIDs(t, first)
idsSecond := nodeIDs(t, second)
if !equalStringSlices(idsFirst, idsSecond) {
t.Errorf("non-deterministic order: first=%v second=%v", idsFirst, idsSecond)
}
// Sorted order expected.
want := []string{"a", "b", "m", "z"}
if !equalStringSlices(idsFirst, want) {
t.Errorf("node order = %v, want %v", idsFirst, want)
}
}
// TestNormalize_HandleIdsEnforced ensures source/target handle ids
// match the front-end's React Flow convention (source=start,
// target=end) regardless of which value the writer used. Agent/tool
// handles (non-"end"/"start" ids) are left alone.
func TestNormalize_HandleIdsEnforced(t *testing.T) {
in := map[string]any{
"graph": map[string]any{
"nodes": []any{
map[string]any{"id": "a", "type": "beginNode"},
map[string]any{"id": "b", "type": "messageNode"},
},
"edges": []any{
// Inverted: source uses "end" instead of "start".
map[string]any{
"id": "e1",
"source": "a",
"target": "b",
"sourceHandle": "end",
"targetHandle": "start",
},
// Agent/tool handle: must NOT be touched.
map[string]any{
"id": "e2",
"source": "a",
"target": "b",
"sourceHandle": "tool-1",
"targetHandle": "tool-1",
},
},
},
"components": map[string]any{},
}
out := NormalizeForCanvas(in)
edges, _ := out["graph"].(map[string]any)["edges"].([]any)
if len(edges) != 2 {
t.Fatalf("edges length = %d, want 2", len(edges))
}
e1, _ := edges[0].(map[string]any)
if e1["sourceHandle"] != "start" {
t.Errorf("e1 sourceHandle = %v, want start", e1["sourceHandle"])
}
if e1["targetHandle"] != "end" {
t.Errorf("e1 targetHandle = %v, want end", e1["targetHandle"])
}
e2, _ := edges[1].(map[string]any)
if e2["sourceHandle"] != "tool-1" {
t.Errorf("e2 sourceHandle = %v, want tool-1 (preserved)", e2["sourceHandle"])
}
if e2["targetHandle"] != "tool-1" {
t.Errorf("e2 targetHandle = %v, want tool-1 (preserved)", e2["targetHandle"])
}
}
// TestNormalizeForRun_FoldsLoopAndIteration is the runtime
// compatibility step: a dsl carrying the legacy Loop+LoopItem or
// Iteration+IterationItem node pair must be folded into a single
// Loop/Parallel node, with the child node removed and its downstream
// merged into the parent. Iteration parents are also renamed to
// "Parallel" so downstream compile/expand paths only see modern names.
//
// Two cases are exercised end-to-end:
//
// 1. Loop:abc + LoopItem:def + Body:1 → Loop:abc + Body:1
// (child dropped; Body:1 appended to parent.downstream)
// 2. Iteration:abc + IterationItem:def + Body:1 → Parallel:abc + Body:1
// (child dropped; parent renamed to Parallel; Body:1 appended)
func TestNormalizeForRun_FoldsLoopAndIteration(t *testing.T) {
t.Run("LoopPlusLoopItem", func(t *testing.T) {
in := map[string]any{
"graph": map[string]any{
"nodes": []any{
map[string]any{"id": "Loop:abc", "type": "loopNode"},
map[string]any{"id": "LoopItem:def", "type": "loopStartNode", "parentId": "Loop:abc"},
map[string]any{"id": "Body:1", "type": "messageNode"},
},
"edges": []any{},
},
"components": map[string]any{
"Loop:abc": map[string]any{
"obj": map[string]any{"component_name": "Loop", "params": map[string]any{"k": "v"}},
"downstream": []any{"LoopItem:def"},
},
"LoopItem:def": map[string]any{
"obj": map[string]any{"component_name": "LoopItem", "params": map[string]any{}},
"downstream": []any{"Body:1"},
},
"Body:1": map[string]any{
"obj": map[string]any{"component_name": "Message", "params": map[string]any{}},
"downstream": []any{},
},
},
}
out := NormalizeForRun(in)
comps, _ := out["components"].(map[string]any)
if _, dropped := comps["LoopItem:def"]; dropped {
t.Error("LoopItem:def should be folded away")
}
parent, _ := comps["Loop:abc"].(map[string]any)
if parent == nil {
t.Fatal("Loop:abc missing after fold")
}
ds, _ := parent["downstream"].([]any)
gotDS := stringSliceAny(ds)
if !equalStringSlices(gotDS, []string{"Body:1"}) {
t.Errorf("parent.downstream = %v, want [Body:1]", gotDS)
}
})
t.Run("IterationPlusIterationItem", func(t *testing.T) {
in := map[string]any{
"graph": map[string]any{
"nodes": []any{
map[string]any{"id": "Iteration:abc", "type": "iterationNode"},
map[string]any{"id": "IterationItem:def", "type": "iterationStartNode", "parentId": "Iteration:abc"},
map[string]any{"id": "Body:1", "type": "messageNode"},
},
"edges": []any{},
},
"components": map[string]any{
"Iteration:abc": map[string]any{
"obj": map[string]any{"component_name": "Iteration", "params": map[string]any{"items_ref": "x"}},
"downstream": []any{"IterationItem:def"},
},
"IterationItem:def": map[string]any{
"obj": map[string]any{"component_name": "IterationItem", "params": map[string]any{}},
"downstream": []any{"Body:1"},
},
"Body:1": map[string]any{
"obj": map[string]any{"component_name": "Message", "params": map[string]any{}},
"downstream": []any{},
},
},
}
out := NormalizeForRun(in)
comps, _ := out["components"].(map[string]any)
if _, dropped := comps["IterationItem:def"]; dropped == false {
// has anything, we want it dropped
if comps["IterationItem:def"] != nil {
t.Error("IterationItem:def should be folded away")
}
}
parent, _ := comps["Iteration:abc"].(map[string]any)
if parent == nil {
t.Fatal("Iteration:abc missing after fold")
}
// Renamed to "Parallel".
if obj, _ := parent["obj"].(map[string]any); obj != nil {
if obj["component_name"] != "Parallel" {
t.Errorf("parent.obj.component_name = %v, want Parallel", obj["component_name"])
}
}
ds, _ := parent["downstream"].([]any)
gotDS := stringSliceAny(ds)
if !equalStringSlices(gotDS, []string{"Body:1"}) {
t.Errorf("parent.downstream = %v, want [Body:1]", gotDS)
}
// Graph node label also renamed so the front-end's
// componentNameToNodeTypeMap lookup ("Parallel" →
// "parallelNode") succeeds.
graph, _ := out["graph"].(map[string]any)
if graph != nil {
nodes, _ := graph["nodes"].([]any)
for _, n := range nodes {
nm, _ := n.(map[string]any)
if nm == nil {
continue
}
if nm["id"] != "Iteration:abc" {
continue
}
if data, _ := nm["data"].(map[string]any); data != nil {
if data["label"] != "Parallel" {
t.Errorf("graph node data.label = %v, want Parallel", data["label"])
}
}
if nm["type"] != "parallelNode" {
t.Errorf("graph node type = %v, want parallelNode", nm["type"])
}
}
}
})
}
func TestNormalizeForRun_RewritesLegacyIterationAliases(t *testing.T) {
in := map[string]any{
"graph": map[string]any{
"nodes": []any{
map[string]any{"id": "Iteration:abc", "type": "iterationNode"},
map[string]any{"id": "IterationItem:def", "type": "iterationStartNode", "parentId": "Iteration:abc"},
map[string]any{"id": "Body:1", "type": "messageNode"},
},
"edges": []any{},
},
"components": map[string]any{
"Iteration:abc": map[string]any{
"obj": map[string]any{
"component_name": "Iteration",
"params": map[string]any{
"items_ref": "sys.arr",
"outputs": map[string]any{
"lines": map[string]any{
"ref": "StringTransform:FmtItem@result",
"type": "Array<string>",
},
},
},
},
"downstream": []any{"IterationItem:def", "Done:1"},
},
"IterationItem:def": map[string]any{
"obj": map[string]any{"component_name": "IterationItem", "params": map[string]any{}},
"downstream": []any{"Body:1"},
},
"Body:1": map[string]any{
"obj": map[string]any{
"component_name": "Message",
"params": map[string]any{
"content": []any{
"{IterationItem:def@index}: {IterationItem:def@item}",
},
},
},
"downstream": []any{},
},
"Done:1": map[string]any{
"obj": map[string]any{
"component_name": "Message",
"params": map[string]any{
"content": []any{
"done {Iteration:abc@lines}",
},
},
},
"downstream": []any{},
},
},
}
out := NormalizeForRun(in)
comps, _ := out["components"].(map[string]any)
body, _ := comps["Body:1"].(map[string]any)
if body == nil {
t.Fatal("Body:1 missing after normalize")
}
obj, _ := body["obj"].(map[string]any)
params, _ := obj["params"].(map[string]any)
content, _ := params["content"].([]any)
if len(content) != 1 {
t.Fatalf("content len = %d, want 1", len(content))
}
got, _ := content[0].(string)
want := "{index}: {item}"
if got != want {
t.Fatalf("legacy iteration aliases: got %q, want %q", got, want)
}
}
// TestNormalizeForCanvas_PreservesIterationCanvasShape pins the
// front-end protocol boundary: canvas-facing normalization may repair
// graph structure, but it must not rename Iteration to Parallel or
// emit the internal parallelNode type.
func TestNormalizeForCanvas_PreservesIterationCanvasShape(t *testing.T) {
in := map[string]any{
"graph": map[string]any{
"nodes": []any{
map[string]any{
"id": "Iteration:abc",
"type": "iterationNode",
"data": map[string]any{"label": "Iteration", "name": "Iteration"},
},
map[string]any{
"id": "IterationItem:def",
"type": "iterationStartNode",
"parentId": "Iteration:abc",
"data": map[string]any{"label": "IterationItem", "name": "IterationItem"},
},
},
"edges": []any{},
},
"components": map[string]any{
"Iteration:abc": map[string]any{
"obj": map[string]any{"component_name": "Iteration", "params": map[string]any{"items_ref": "x"}},
"downstream": []any{"IterationItem:def"},
},
"IterationItem:def": map[string]any{
"obj": map[string]any{"component_name": "IterationItem", "params": map[string]any{}},
"downstream": []any{},
},
},
}
out := NormalizeForCanvas(in)
comps, _ := out["components"].(map[string]any)
parent, _ := comps["Iteration:abc"].(map[string]any)
if parent == nil {
t.Fatal("Iteration:abc missing after canvas normalize")
}
if obj, _ := parent["obj"].(map[string]any); obj != nil {
if obj["component_name"] != "Iteration" {
t.Errorf("canvas normalize renamed component_name = %v, want Iteration", obj["component_name"])
}
}
if _, ok := comps["IterationItem:def"]; !ok {
t.Error("canvas normalize folded away IterationItem:def; want preserved")
}
graph, _ := out["graph"].(map[string]any)
nodes, _ := graph["nodes"].([]any)
for _, n := range nodes {
nm, _ := n.(map[string]any)
if nm == nil || nm["id"] != "Iteration:abc" {
continue
}
if nm["type"] != "iterationNode" {
t.Errorf("canvas normalize graph node type = %v, want iterationNode", nm["type"])
}
if data, _ := nm["data"].(map[string]any); data != nil {
if data["label"] != "Iteration" {
t.Errorf("canvas normalize graph label = %v, want Iteration", data["label"])
}
}
}
}
// TestNormalizeForCanvas_RepairsLeakedParallelShape verifies that a
// historically polluted stored DSL is repaired on read before it goes
// back to the front-end: internal Parallel / parallelNode must be
// mapped back to Iteration / iterationNode without mutating the input.
func TestNormalizeForCanvas_RepairsLeakedParallelShape(t *testing.T) {
in := map[string]any{
"graph": map[string]any{
"nodes": []any{
map[string]any{
"id": "Iteration:abc",
"type": "parallelNode",
"data": map[string]any{"label": "Parallel", "name": "Parallel"},
},
},
"edges": []any{},
},
"components": map[string]any{
"Iteration:abc": map[string]any{
"obj": map[string]any{
"component_name": "Parallel",
"params": map[string]any{},
},
"downstream": []any{},
"upstream": []any{},
},
},
}
out := NormalizeForCanvas(in)
comps, _ := out["components"].(map[string]any)
parent, _ := comps["Iteration:abc"].(map[string]any)
obj, _ := parent["obj"].(map[string]any)
if obj["component_name"] != "Iteration" {
t.Errorf("component_name = %v, want Iteration", obj["component_name"])
}
graph, _ := out["graph"].(map[string]any)
nodes, _ := graph["nodes"].([]any)
node, _ := nodes[0].(map[string]any)
if node["type"] != "iterationNode" {
t.Errorf("graph node type = %v, want iterationNode", node["type"])
}
data, _ := node["data"].(map[string]any)
if data["label"] != "Iteration" {
t.Errorf("graph node label = %v, want Iteration", data["label"])
}
if data["name"] != "Iteration" {
t.Errorf("graph node name = %v, want Iteration", data["name"])
}
origNode := in["graph"].(map[string]any)["nodes"].([]any)[0].(map[string]any)
if origNode["type"] != "parallelNode" {
t.Errorf("input node type mutated to %v", origNode["type"])
}
}
// TestNormalize_DoesNotMutateInput pins the documented
// "never mutates its input" contract. The original DSL map's
// graph.edges[*].sourceHandle / targetHandle, components
// entries, and components[*].obj.component_name must all be
// unchanged after NormalizeForCanvas returns.
func TestNormalize_DoesNotMutateInput(t *testing.T) {
in := map[string]any{
"graph": map[string]any{
"nodes": []any{
map[string]any{"id": "begin", "type": "beginNode"},
},
"edges": []any{
map[string]any{
"id": "e1",
"source": "begin",
"target": "begin",
"sourceHandle": "end", // inverted — to be rewritten
"targetHandle": "start",
},
},
},
"components": map[string]any{
"Iteration:abc": map[string]any{
"obj": map[string]any{
"component_name": "Iteration", // to be renamed to Parallel
"params": map[string]any{"items_ref": "x"},
},
"downstream": []any{"IterationItem:def"},
},
"IterationItem:def": map[string]any{
"obj": map[string]any{
"component_name": "IterationItem",
"params": map[string]any{},
},
"downstream": []any{"Body:1"},
},
},
}
// Snapshot the original before NormalizeForCanvas runs.
origEdge := in["graph"].(map[string]any)["edges"].([]any)[0].(map[string]any)
origSourceHandle := origEdge["sourceHandle"]
origTargetHandle := origEdge["targetHandle"]
origIterObj := in["components"].(map[string]any)["Iteration:abc"].(map[string]any)["obj"].(map[string]any)
origIterName := origIterObj["component_name"]
iterItemKey := "IterationItem:def"
hadIterItem := false
if _, ok := in["components"].(map[string]any)[iterItemKey]; ok {
hadIterItem = true
}
// Run the normalizer.
_ = NormalizeForRun(in)
// (1) graph.edges[*].sourceHandle / targetHandle must NOT be
// rewritten in the original input.
if got := origEdge["sourceHandle"]; got != origSourceHandle {
t.Errorf("input edge sourceHandle mutated: %v -> %v", origSourceHandle, got)
}
if got := origEdge["targetHandle"]; got != origTargetHandle {
t.Errorf("input edge targetHandle mutated: %v -> %v", origTargetHandle, got)
}
// (2) components[*].obj.component_name must NOT be renamed
// in the original input.
if got := origIterObj["component_name"]; got != origIterName {
t.Errorf("input components[Itr].obj.component_name mutated: %v -> %v", origIterName, got)
}
// (3) The legacy child component must still be present in
// the original components map (fold deletes from the COPY).
if !hadIterItem {
t.Errorf("input was missing IterationItem:def before normalize — fixture bug")
}
if _, ok := in["components"].(map[string]any)[iterItemKey]; !ok {
t.Errorf("input components[IterationItem:def] was deleted by fold — input was mutated")
}
}
// TestNormalize_FixtureSmoke walks the on-disk testdata/ directory
// and runs every fixture through NormalizeForCanvas. The dsl_examples e2e
// suite enumerates the same fixture names; keeping the two lists in
// sync is the responsibility of internal/agent/canvas/dsl_examples_test.go
// — the comment in dsl_examples_test.go is the single source of truth.
func TestNormalize_FixtureSmoke(t *testing.T) {
dir := fixturesDir(t)
entries, err := os.ReadDir(dir)
if err != nil {
t.Skipf("testdata/ not readable: %v", err)
}
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") {
continue
}
name := e.Name()
t.Run(name, func(t *testing.T) {
raw := loadFixture(t, name)
out := NormalizeForRun(raw)
if out == nil {
t.Fatalf("[%s] normalize returned nil", name)
}
// Must have either graph or components at minimum.
_, hasGraph := out["graph"].(map[string]any)
comps, hasComps := out["components"].(map[string]any)
if !hasGraph && !hasComps {
t.Errorf("[%s] normalize produced neither graph nor components", name)
}
// If components survived on the runtime-normalized view, no
// Iteration / LoopItem / IterationItem may linger — those are folded.
if hasComps {
for id, raw := range comps {
comp, _ := raw.(map[string]any)
if comp == nil {
continue
}
if obj, _ := comp["obj"].(map[string]any); obj != nil {
switch obj["component_name"] {
case "LoopItem", "IterationItem":
t.Errorf("[%s] component %q still has legacy name %q", name, id, obj["component_name"])
case "Iteration":
t.Errorf("[%s] component %q still has pre-rename name %q", name, id, obj["component_name"])
}
}
}
}
})
}
}
// ----- helpers -----
func nodeIDs(t *testing.T, dsl map[string]any) []string {
t.Helper()
graph, _ := dsl["graph"].(map[string]any)
if graph == nil {
return nil
}
nodes, _ := graph["nodes"].([]any)
ids := make([]string, 0, len(nodes))
for _, n := range nodes {
nm, _ := n.(map[string]any)
if nm == nil {
continue
}
if id, _ := nm["id"].(string); id != "" {
ids = append(ids, id)
}
}
return ids
}
func stringSliceAny(v []any) []string {
out := make([]string, 0, len(v))
for _, x := range v {
if s, ok := x.(string); ok {
out = append(out, s)
}
}
return out
}
func equalStringSlices(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}