mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-30 07:51:10 +08:00
## Summary
Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.
The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.
## Scope of alignment
### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.
### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.
### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.
### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.
### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.
## Out of scope (intentionally)
- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.
## How alignment is verified
`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle
`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).
`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.
```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```
## Design reference
`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).
---------
Co-authored-by: Claude <noreply@anthropic.com>
706 lines
22 KiB
Go
706 lines
22 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package dsl
|
|
|
|
import (
|
|
"encoding/json"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
)
|
|
|
|
// fixturesDir resolves the testdata/ directory for the dsl package. The
|
|
// production fixtures live at internal/agent/dsl/testdata/ (after the
|
|
// v1_examples/ flatten). Helpers and tests below walk that directory
|
|
// directly so a new fixture is picked up automatically.
|
|
func fixturesDir(t *testing.T) string {
|
|
t.Helper()
|
|
return filepath.Join("testdata")
|
|
}
|
|
|
|
// loadFixture reads a JSON file from internal/agent/dsl/testdata into a
|
|
// map[string]any. The function t.Skip()s (not t.Fatal) on a missing
|
|
// file so a test run on a slim checkout still goes green.
|
|
func loadFixture(t *testing.T, name string) map[string]any {
|
|
t.Helper()
|
|
raw, err := os.ReadFile(filepath.Join(fixturesDir(t), name))
|
|
if err != nil {
|
|
t.Skipf("fixture %s not readable: %v", name, err)
|
|
}
|
|
var m map[string]any
|
|
if err := json.Unmarshal(raw, &m); err != nil {
|
|
t.Fatalf("[%s] parse: %v", name, err)
|
|
}
|
|
return m
|
|
}
|
|
|
|
// TestNormalize_NoopWhenGraphPresent guards against accidentally
|
|
// clobbering a payload the front-end just saved. Any input with a
|
|
// non-empty `graph.nodes` must round-trip with `graph` untouched.
|
|
func TestNormalize_NoopWhenGraphPresent(t *testing.T) {
|
|
in := map[string]any{
|
|
"graph": map[string]any{
|
|
"nodes": []any{
|
|
map[string]any{"id": "begin", "type": "beginNode"},
|
|
},
|
|
"edges": []any{},
|
|
},
|
|
"components": map[string]any{},
|
|
}
|
|
out := NormalizeForCanvas(in)
|
|
got, _ := out["graph"].(map[string]any)
|
|
if got == nil {
|
|
t.Fatal("graph block disappeared")
|
|
}
|
|
if nodes, _ := got["nodes"].([]any); len(nodes) != 1 {
|
|
t.Errorf("graph.nodes length = %d, want 1", len(nodes))
|
|
}
|
|
}
|
|
|
|
// TestNormalize_BuildsGraphFromComponents verifies the
|
|
// components → graph derivation path. Given a populated `components`
|
|
// block but no `graph`, the function should produce a deterministic
|
|
// graph with one node per component and one edge per downstream
|
|
// declaration. The graph node's `data.form` must always be present
|
|
// (even when empty) so the front-end's React Flow shape is stable.
|
|
func TestNormalize_BuildsGraphFromComponents(t *testing.T) {
|
|
in := map[string]any{
|
|
"components": map[string]any{
|
|
"begin": map[string]any{
|
|
"obj": map[string]any{"component_name": "Begin", "params": map[string]any{}},
|
|
"downstream": []any{"llm:0"},
|
|
},
|
|
"llm:0": map[string]any{
|
|
"obj": map[string]any{"component_name": "LLM", "params": map[string]any{"k": "v"}},
|
|
"downstream": []any{},
|
|
},
|
|
},
|
|
}
|
|
out := NormalizeForCanvas(in)
|
|
graph, _ := out["graph"].(map[string]any)
|
|
if graph == nil {
|
|
t.Fatal("graph not derived from components")
|
|
}
|
|
nodes, _ := graph["nodes"].([]any)
|
|
if len(nodes) != 2 {
|
|
t.Fatalf("graph.nodes length = %d, want 2", len(nodes))
|
|
}
|
|
edges, _ := graph["edges"].([]any)
|
|
if len(edges) != 1 {
|
|
t.Fatalf("graph.edges length = %d, want 1", len(edges))
|
|
}
|
|
// Each derived node must carry data.form, even when the source
|
|
// params is empty — this is the front-end's React-Flow invariant.
|
|
for _, n := range nodes {
|
|
nm, _ := n.(map[string]any)
|
|
if nm == nil {
|
|
continue
|
|
}
|
|
data, _ := nm["data"].(map[string]any)
|
|
if data == nil {
|
|
t.Errorf("node %v missing data block", nm["id"])
|
|
continue
|
|
}
|
|
if _, ok := data["form"]; !ok {
|
|
t.Errorf("node %v data.form missing", nm["id"])
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestNormalize_DeterministicNodeOrder guards against the Go runtime
|
|
// map iteration lottery producing a different graph layout on
|
|
// successive normalize passes. The function must sort the component
|
|
// ids before iterating so the layout (x = 50 + i*350) is a stable
|
|
// function of the input dsl.
|
|
func TestNormalize_DeterministicNodeOrder(t *testing.T) {
|
|
comps := map[string]any{}
|
|
for _, k := range []string{"z", "a", "m", "b"} {
|
|
comps[k] = map[string]any{
|
|
"obj": map[string]any{"component_name": "Begin", "params": map[string]any{}},
|
|
"downstream": []any{},
|
|
}
|
|
}
|
|
first := NormalizeForCanvas(map[string]any{"components": comps})
|
|
second := NormalizeForCanvas(map[string]any{"components": comps})
|
|
|
|
idsFirst := nodeIDs(t, first)
|
|
idsSecond := nodeIDs(t, second)
|
|
if !equalStringSlices(idsFirst, idsSecond) {
|
|
t.Errorf("non-deterministic order: first=%v second=%v", idsFirst, idsSecond)
|
|
}
|
|
// Sorted order expected.
|
|
want := []string{"a", "b", "m", "z"}
|
|
if !equalStringSlices(idsFirst, want) {
|
|
t.Errorf("node order = %v, want %v", idsFirst, want)
|
|
}
|
|
}
|
|
|
|
// TestNormalize_HandleIdsEnforced ensures source/target handle ids
|
|
// match the front-end's React Flow convention (source=start,
|
|
// target=end) regardless of which value the writer used. Agent/tool
|
|
// handles (non-"end"/"start" ids) are left alone.
|
|
func TestNormalize_HandleIdsEnforced(t *testing.T) {
|
|
in := map[string]any{
|
|
"graph": map[string]any{
|
|
"nodes": []any{
|
|
map[string]any{"id": "a", "type": "beginNode"},
|
|
map[string]any{"id": "b", "type": "messageNode"},
|
|
},
|
|
"edges": []any{
|
|
// Inverted: source uses "end" instead of "start".
|
|
map[string]any{
|
|
"id": "e1",
|
|
"source": "a",
|
|
"target": "b",
|
|
"sourceHandle": "end",
|
|
"targetHandle": "start",
|
|
},
|
|
// Agent/tool handle: must NOT be touched.
|
|
map[string]any{
|
|
"id": "e2",
|
|
"source": "a",
|
|
"target": "b",
|
|
"sourceHandle": "tool-1",
|
|
"targetHandle": "tool-1",
|
|
},
|
|
},
|
|
},
|
|
"components": map[string]any{},
|
|
}
|
|
out := NormalizeForCanvas(in)
|
|
edges, _ := out["graph"].(map[string]any)["edges"].([]any)
|
|
if len(edges) != 2 {
|
|
t.Fatalf("edges length = %d, want 2", len(edges))
|
|
}
|
|
e1, _ := edges[0].(map[string]any)
|
|
if e1["sourceHandle"] != "start" {
|
|
t.Errorf("e1 sourceHandle = %v, want start", e1["sourceHandle"])
|
|
}
|
|
if e1["targetHandle"] != "end" {
|
|
t.Errorf("e1 targetHandle = %v, want end", e1["targetHandle"])
|
|
}
|
|
e2, _ := edges[1].(map[string]any)
|
|
if e2["sourceHandle"] != "tool-1" {
|
|
t.Errorf("e2 sourceHandle = %v, want tool-1 (preserved)", e2["sourceHandle"])
|
|
}
|
|
if e2["targetHandle"] != "tool-1" {
|
|
t.Errorf("e2 targetHandle = %v, want tool-1 (preserved)", e2["targetHandle"])
|
|
}
|
|
}
|
|
|
|
// TestNormalizeForRun_FoldsLoopAndIteration is the runtime
|
|
// compatibility step: a dsl carrying the legacy Loop+LoopItem or
|
|
// Iteration+IterationItem node pair must be folded into a single
|
|
// Loop/Parallel node, with the child node removed and its downstream
|
|
// merged into the parent. Iteration parents are also renamed to
|
|
// "Parallel" so downstream compile/expand paths only see modern names.
|
|
//
|
|
// Two cases are exercised end-to-end:
|
|
//
|
|
// 1. Loop:abc + LoopItem:def + Body:1 → Loop:abc + Body:1
|
|
// (child dropped; Body:1 appended to parent.downstream)
|
|
// 2. Iteration:abc + IterationItem:def + Body:1 → Parallel:abc + Body:1
|
|
// (child dropped; parent renamed to Parallel; Body:1 appended)
|
|
func TestNormalizeForRun_FoldsLoopAndIteration(t *testing.T) {
|
|
t.Run("LoopPlusLoopItem", func(t *testing.T) {
|
|
in := map[string]any{
|
|
"graph": map[string]any{
|
|
"nodes": []any{
|
|
map[string]any{"id": "Loop:abc", "type": "loopNode"},
|
|
map[string]any{"id": "LoopItem:def", "type": "loopStartNode", "parentId": "Loop:abc"},
|
|
map[string]any{"id": "Body:1", "type": "messageNode"},
|
|
},
|
|
"edges": []any{},
|
|
},
|
|
"components": map[string]any{
|
|
"Loop:abc": map[string]any{
|
|
"obj": map[string]any{"component_name": "Loop", "params": map[string]any{"k": "v"}},
|
|
"downstream": []any{"LoopItem:def"},
|
|
},
|
|
"LoopItem:def": map[string]any{
|
|
"obj": map[string]any{"component_name": "LoopItem", "params": map[string]any{}},
|
|
"downstream": []any{"Body:1"},
|
|
},
|
|
"Body:1": map[string]any{
|
|
"obj": map[string]any{"component_name": "Message", "params": map[string]any{}},
|
|
"downstream": []any{},
|
|
},
|
|
},
|
|
}
|
|
out := NormalizeForRun(in)
|
|
comps, _ := out["components"].(map[string]any)
|
|
if _, dropped := comps["LoopItem:def"]; dropped {
|
|
t.Error("LoopItem:def should be folded away")
|
|
}
|
|
parent, _ := comps["Loop:abc"].(map[string]any)
|
|
if parent == nil {
|
|
t.Fatal("Loop:abc missing after fold")
|
|
}
|
|
ds, _ := parent["downstream"].([]any)
|
|
gotDS := stringSliceAny(ds)
|
|
if !equalStringSlices(gotDS, []string{"Body:1"}) {
|
|
t.Errorf("parent.downstream = %v, want [Body:1]", gotDS)
|
|
}
|
|
})
|
|
|
|
t.Run("IterationPlusIterationItem", func(t *testing.T) {
|
|
in := map[string]any{
|
|
"graph": map[string]any{
|
|
"nodes": []any{
|
|
map[string]any{"id": "Iteration:abc", "type": "iterationNode"},
|
|
map[string]any{"id": "IterationItem:def", "type": "iterationStartNode", "parentId": "Iteration:abc"},
|
|
map[string]any{"id": "Body:1", "type": "messageNode"},
|
|
},
|
|
"edges": []any{},
|
|
},
|
|
"components": map[string]any{
|
|
"Iteration:abc": map[string]any{
|
|
"obj": map[string]any{"component_name": "Iteration", "params": map[string]any{"items_ref": "x"}},
|
|
"downstream": []any{"IterationItem:def"},
|
|
},
|
|
"IterationItem:def": map[string]any{
|
|
"obj": map[string]any{"component_name": "IterationItem", "params": map[string]any{}},
|
|
"downstream": []any{"Body:1"},
|
|
},
|
|
"Body:1": map[string]any{
|
|
"obj": map[string]any{"component_name": "Message", "params": map[string]any{}},
|
|
"downstream": []any{},
|
|
},
|
|
},
|
|
}
|
|
out := NormalizeForRun(in)
|
|
comps, _ := out["components"].(map[string]any)
|
|
if _, dropped := comps["IterationItem:def"]; dropped == false {
|
|
// has anything, we want it dropped
|
|
if comps["IterationItem:def"] != nil {
|
|
t.Error("IterationItem:def should be folded away")
|
|
}
|
|
}
|
|
parent, _ := comps["Iteration:abc"].(map[string]any)
|
|
if parent == nil {
|
|
t.Fatal("Iteration:abc missing after fold")
|
|
}
|
|
// Renamed to "Parallel".
|
|
if obj, _ := parent["obj"].(map[string]any); obj != nil {
|
|
if obj["component_name"] != "Parallel" {
|
|
t.Errorf("parent.obj.component_name = %v, want Parallel", obj["component_name"])
|
|
}
|
|
}
|
|
ds, _ := parent["downstream"].([]any)
|
|
gotDS := stringSliceAny(ds)
|
|
if !equalStringSlices(gotDS, []string{"Body:1"}) {
|
|
t.Errorf("parent.downstream = %v, want [Body:1]", gotDS)
|
|
}
|
|
// Graph node label also renamed so the front-end's
|
|
// componentNameToNodeTypeMap lookup ("Parallel" →
|
|
// "parallelNode") succeeds.
|
|
graph, _ := out["graph"].(map[string]any)
|
|
if graph != nil {
|
|
nodes, _ := graph["nodes"].([]any)
|
|
for _, n := range nodes {
|
|
nm, _ := n.(map[string]any)
|
|
if nm == nil {
|
|
continue
|
|
}
|
|
if nm["id"] != "Iteration:abc" {
|
|
continue
|
|
}
|
|
if data, _ := nm["data"].(map[string]any); data != nil {
|
|
if data["label"] != "Parallel" {
|
|
t.Errorf("graph node data.label = %v, want Parallel", data["label"])
|
|
}
|
|
}
|
|
if nm["type"] != "parallelNode" {
|
|
t.Errorf("graph node type = %v, want parallelNode", nm["type"])
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestNormalizeForRun_RewritesLegacyIterationAliases(t *testing.T) {
|
|
in := map[string]any{
|
|
"graph": map[string]any{
|
|
"nodes": []any{
|
|
map[string]any{"id": "Iteration:abc", "type": "iterationNode"},
|
|
map[string]any{"id": "IterationItem:def", "type": "iterationStartNode", "parentId": "Iteration:abc"},
|
|
map[string]any{"id": "Body:1", "type": "messageNode"},
|
|
},
|
|
"edges": []any{},
|
|
},
|
|
"components": map[string]any{
|
|
"Iteration:abc": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "Iteration",
|
|
"params": map[string]any{
|
|
"items_ref": "sys.arr",
|
|
"outputs": map[string]any{
|
|
"lines": map[string]any{
|
|
"ref": "StringTransform:FmtItem@result",
|
|
"type": "Array<string>",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"downstream": []any{"IterationItem:def", "Done:1"},
|
|
},
|
|
"IterationItem:def": map[string]any{
|
|
"obj": map[string]any{"component_name": "IterationItem", "params": map[string]any{}},
|
|
"downstream": []any{"Body:1"},
|
|
},
|
|
"Body:1": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "Message",
|
|
"params": map[string]any{
|
|
"content": []any{
|
|
"{IterationItem:def@index}: {IterationItem:def@item}",
|
|
},
|
|
},
|
|
},
|
|
"downstream": []any{},
|
|
},
|
|
"Done:1": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "Message",
|
|
"params": map[string]any{
|
|
"content": []any{
|
|
"done {Iteration:abc@lines}",
|
|
},
|
|
},
|
|
},
|
|
"downstream": []any{},
|
|
},
|
|
},
|
|
}
|
|
|
|
out := NormalizeForRun(in)
|
|
comps, _ := out["components"].(map[string]any)
|
|
body, _ := comps["Body:1"].(map[string]any)
|
|
if body == nil {
|
|
t.Fatal("Body:1 missing after normalize")
|
|
}
|
|
obj, _ := body["obj"].(map[string]any)
|
|
params, _ := obj["params"].(map[string]any)
|
|
content, _ := params["content"].([]any)
|
|
if len(content) != 1 {
|
|
t.Fatalf("content len = %d, want 1", len(content))
|
|
}
|
|
got, _ := content[0].(string)
|
|
want := "{index}: {item}"
|
|
if got != want {
|
|
t.Fatalf("legacy iteration aliases: got %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
// TestNormalizeForCanvas_PreservesIterationCanvasShape pins the
|
|
// front-end protocol boundary: canvas-facing normalization may repair
|
|
// graph structure, but it must not rename Iteration to Parallel or
|
|
// emit the internal parallelNode type.
|
|
func TestNormalizeForCanvas_PreservesIterationCanvasShape(t *testing.T) {
|
|
in := map[string]any{
|
|
"graph": map[string]any{
|
|
"nodes": []any{
|
|
map[string]any{
|
|
"id": "Iteration:abc",
|
|
"type": "iterationNode",
|
|
"data": map[string]any{"label": "Iteration", "name": "Iteration"},
|
|
},
|
|
map[string]any{
|
|
"id": "IterationItem:def",
|
|
"type": "iterationStartNode",
|
|
"parentId": "Iteration:abc",
|
|
"data": map[string]any{"label": "IterationItem", "name": "IterationItem"},
|
|
},
|
|
},
|
|
"edges": []any{},
|
|
},
|
|
"components": map[string]any{
|
|
"Iteration:abc": map[string]any{
|
|
"obj": map[string]any{"component_name": "Iteration", "params": map[string]any{"items_ref": "x"}},
|
|
"downstream": []any{"IterationItem:def"},
|
|
},
|
|
"IterationItem:def": map[string]any{
|
|
"obj": map[string]any{"component_name": "IterationItem", "params": map[string]any{}},
|
|
"downstream": []any{},
|
|
},
|
|
},
|
|
}
|
|
|
|
out := NormalizeForCanvas(in)
|
|
comps, _ := out["components"].(map[string]any)
|
|
parent, _ := comps["Iteration:abc"].(map[string]any)
|
|
if parent == nil {
|
|
t.Fatal("Iteration:abc missing after canvas normalize")
|
|
}
|
|
if obj, _ := parent["obj"].(map[string]any); obj != nil {
|
|
if obj["component_name"] != "Iteration" {
|
|
t.Errorf("canvas normalize renamed component_name = %v, want Iteration", obj["component_name"])
|
|
}
|
|
}
|
|
if _, ok := comps["IterationItem:def"]; !ok {
|
|
t.Error("canvas normalize folded away IterationItem:def; want preserved")
|
|
}
|
|
|
|
graph, _ := out["graph"].(map[string]any)
|
|
nodes, _ := graph["nodes"].([]any)
|
|
for _, n := range nodes {
|
|
nm, _ := n.(map[string]any)
|
|
if nm == nil || nm["id"] != "Iteration:abc" {
|
|
continue
|
|
}
|
|
if nm["type"] != "iterationNode" {
|
|
t.Errorf("canvas normalize graph node type = %v, want iterationNode", nm["type"])
|
|
}
|
|
if data, _ := nm["data"].(map[string]any); data != nil {
|
|
if data["label"] != "Iteration" {
|
|
t.Errorf("canvas normalize graph label = %v, want Iteration", data["label"])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestNormalizeForCanvas_RepairsLeakedParallelShape verifies that a
|
|
// historically polluted stored DSL is repaired on read before it goes
|
|
// back to the front-end: internal Parallel / parallelNode must be
|
|
// mapped back to Iteration / iterationNode without mutating the input.
|
|
func TestNormalizeForCanvas_RepairsLeakedParallelShape(t *testing.T) {
|
|
in := map[string]any{
|
|
"graph": map[string]any{
|
|
"nodes": []any{
|
|
map[string]any{
|
|
"id": "Iteration:abc",
|
|
"type": "parallelNode",
|
|
"data": map[string]any{"label": "Parallel", "name": "Parallel"},
|
|
},
|
|
},
|
|
"edges": []any{},
|
|
},
|
|
"components": map[string]any{
|
|
"Iteration:abc": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "Parallel",
|
|
"params": map[string]any{},
|
|
},
|
|
"downstream": []any{},
|
|
"upstream": []any{},
|
|
},
|
|
},
|
|
}
|
|
|
|
out := NormalizeForCanvas(in)
|
|
comps, _ := out["components"].(map[string]any)
|
|
parent, _ := comps["Iteration:abc"].(map[string]any)
|
|
obj, _ := parent["obj"].(map[string]any)
|
|
if obj["component_name"] != "Iteration" {
|
|
t.Errorf("component_name = %v, want Iteration", obj["component_name"])
|
|
}
|
|
|
|
graph, _ := out["graph"].(map[string]any)
|
|
nodes, _ := graph["nodes"].([]any)
|
|
node, _ := nodes[0].(map[string]any)
|
|
if node["type"] != "iterationNode" {
|
|
t.Errorf("graph node type = %v, want iterationNode", node["type"])
|
|
}
|
|
data, _ := node["data"].(map[string]any)
|
|
if data["label"] != "Iteration" {
|
|
t.Errorf("graph node label = %v, want Iteration", data["label"])
|
|
}
|
|
if data["name"] != "Iteration" {
|
|
t.Errorf("graph node name = %v, want Iteration", data["name"])
|
|
}
|
|
|
|
origNode := in["graph"].(map[string]any)["nodes"].([]any)[0].(map[string]any)
|
|
if origNode["type"] != "parallelNode" {
|
|
t.Errorf("input node type mutated to %v", origNode["type"])
|
|
}
|
|
}
|
|
|
|
// TestNormalize_DoesNotMutateInput pins the documented
|
|
// "never mutates its input" contract. The original DSL map's
|
|
// graph.edges[*].sourceHandle / targetHandle, components
|
|
// entries, and components[*].obj.component_name must all be
|
|
// unchanged after NormalizeForCanvas returns.
|
|
func TestNormalize_DoesNotMutateInput(t *testing.T) {
|
|
in := map[string]any{
|
|
"graph": map[string]any{
|
|
"nodes": []any{
|
|
map[string]any{"id": "begin", "type": "beginNode"},
|
|
},
|
|
"edges": []any{
|
|
map[string]any{
|
|
"id": "e1",
|
|
"source": "begin",
|
|
"target": "begin",
|
|
"sourceHandle": "end", // inverted — to be rewritten
|
|
"targetHandle": "start",
|
|
},
|
|
},
|
|
},
|
|
"components": map[string]any{
|
|
"Iteration:abc": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "Iteration", // to be renamed to Parallel
|
|
"params": map[string]any{"items_ref": "x"},
|
|
},
|
|
"downstream": []any{"IterationItem:def"},
|
|
},
|
|
"IterationItem:def": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "IterationItem",
|
|
"params": map[string]any{},
|
|
},
|
|
"downstream": []any{"Body:1"},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Snapshot the original before NormalizeForCanvas runs.
|
|
origEdge := in["graph"].(map[string]any)["edges"].([]any)[0].(map[string]any)
|
|
origSourceHandle := origEdge["sourceHandle"]
|
|
origTargetHandle := origEdge["targetHandle"]
|
|
|
|
origIterObj := in["components"].(map[string]any)["Iteration:abc"].(map[string]any)["obj"].(map[string]any)
|
|
origIterName := origIterObj["component_name"]
|
|
|
|
iterItemKey := "IterationItem:def"
|
|
hadIterItem := false
|
|
if _, ok := in["components"].(map[string]any)[iterItemKey]; ok {
|
|
hadIterItem = true
|
|
}
|
|
|
|
// Run the normalizer.
|
|
_ = NormalizeForRun(in)
|
|
|
|
// (1) graph.edges[*].sourceHandle / targetHandle must NOT be
|
|
// rewritten in the original input.
|
|
if got := origEdge["sourceHandle"]; got != origSourceHandle {
|
|
t.Errorf("input edge sourceHandle mutated: %v -> %v", origSourceHandle, got)
|
|
}
|
|
if got := origEdge["targetHandle"]; got != origTargetHandle {
|
|
t.Errorf("input edge targetHandle mutated: %v -> %v", origTargetHandle, got)
|
|
}
|
|
|
|
// (2) components[*].obj.component_name must NOT be renamed
|
|
// in the original input.
|
|
if got := origIterObj["component_name"]; got != origIterName {
|
|
t.Errorf("input components[Itr].obj.component_name mutated: %v -> %v", origIterName, got)
|
|
}
|
|
|
|
// (3) The legacy child component must still be present in
|
|
// the original components map (fold deletes from the COPY).
|
|
if !hadIterItem {
|
|
t.Errorf("input was missing IterationItem:def before normalize — fixture bug")
|
|
}
|
|
if _, ok := in["components"].(map[string]any)[iterItemKey]; !ok {
|
|
t.Errorf("input components[IterationItem:def] was deleted by fold — input was mutated")
|
|
}
|
|
}
|
|
|
|
// TestNormalize_FixtureSmoke walks the on-disk testdata/ directory
|
|
// and runs every fixture through NormalizeForCanvas. The dsl_examples e2e
|
|
// suite enumerates the same fixture names; keeping the two lists in
|
|
// sync is the responsibility of internal/agent/canvas/dsl_examples_test.go
|
|
// — the comment in dsl_examples_test.go is the single source of truth.
|
|
func TestNormalize_FixtureSmoke(t *testing.T) {
|
|
dir := fixturesDir(t)
|
|
entries, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
t.Skipf("testdata/ not readable: %v", err)
|
|
}
|
|
for _, e := range entries {
|
|
if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") {
|
|
continue
|
|
}
|
|
name := e.Name()
|
|
t.Run(name, func(t *testing.T) {
|
|
raw := loadFixture(t, name)
|
|
out := NormalizeForRun(raw)
|
|
if out == nil {
|
|
t.Fatalf("[%s] normalize returned nil", name)
|
|
}
|
|
// Must have either graph or components at minimum.
|
|
_, hasGraph := out["graph"].(map[string]any)
|
|
comps, hasComps := out["components"].(map[string]any)
|
|
if !hasGraph && !hasComps {
|
|
t.Errorf("[%s] normalize produced neither graph nor components", name)
|
|
}
|
|
// If components survived on the runtime-normalized view, no
|
|
// Iteration / LoopItem / IterationItem may linger — those are folded.
|
|
if hasComps {
|
|
for id, raw := range comps {
|
|
comp, _ := raw.(map[string]any)
|
|
if comp == nil {
|
|
continue
|
|
}
|
|
if obj, _ := comp["obj"].(map[string]any); obj != nil {
|
|
switch obj["component_name"] {
|
|
case "LoopItem", "IterationItem":
|
|
t.Errorf("[%s] component %q still has legacy name %q", name, id, obj["component_name"])
|
|
case "Iteration":
|
|
t.Errorf("[%s] component %q still has pre-rename name %q", name, id, obj["component_name"])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// ----- helpers -----
|
|
|
|
func nodeIDs(t *testing.T, dsl map[string]any) []string {
|
|
t.Helper()
|
|
graph, _ := dsl["graph"].(map[string]any)
|
|
if graph == nil {
|
|
return nil
|
|
}
|
|
nodes, _ := graph["nodes"].([]any)
|
|
ids := make([]string, 0, len(nodes))
|
|
for _, n := range nodes {
|
|
nm, _ := n.(map[string]any)
|
|
if nm == nil {
|
|
continue
|
|
}
|
|
if id, _ := nm["id"].(string); id != "" {
|
|
ids = append(ids, id)
|
|
}
|
|
}
|
|
return ids
|
|
}
|
|
|
|
func stringSliceAny(v []any) []string {
|
|
out := make([]string, 0, len(v))
|
|
for _, x := range v {
|
|
if s, ok := x.(string); ok {
|
|
out = append(out, s)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func equalStringSlices(a, b []string) bool {
|
|
if len(a) != len(b) {
|
|
return false
|
|
}
|
|
for i := range a {
|
|
if a[i] != b[i] {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|