mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-01 00:05:43 +08:00
## Summary
Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.
The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.
## Scope of alignment
### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.
### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.
### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.
### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.
### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.
## Out of scope (intentionally)
- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.
## How alignment is verified
`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle
`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).
`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.
```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```
## Design reference
`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).
---------
Co-authored-by: Claude <noreply@anthropic.com>
1000 lines
30 KiB
Go
1000 lines
30 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
// loop_subgraph_test.go — table-driven tests for the Loop macro
|
|
// expansion helpers in loop_subgraph.go.
|
|
//
|
|
// Tests cover:
|
|
// - collectDescendants (DAG and diamond shapes, back-edge handling)
|
|
// - resolveInitialVariables (constant / zero-init / variable modes)
|
|
// - zeroValueForType (number / string / boolean / object* / array* / unknown)
|
|
// - readMaxLoopCount (missing, int, int64, float64)
|
|
// - translateLoopCondition (single op, AND/OR, invalid logical_operator,
|
|
// incomplete entries, empty conditions)
|
|
// - evalOneLoopCondition + evaluateCondition (operator dispatch on
|
|
// string / bool / number / dict / list / nil; the same operator
|
|
// set as agent/component/loopitem.py:48-122)
|
|
// - BuildWorkflow end-to-end (Loop + body, legacy ExitLoop no-op,
|
|
// unknown component error path)
|
|
|
|
package canvas
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"testing"
|
|
)
|
|
|
|
// ---- collectDescendants ----
|
|
|
|
func TestCollectLoopMembers_UsesGroupedChildren(t *testing.T) {
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"Loop:InputUntil1": {Obj: CanvasComponentObj{ComponentName: "Loop"}, Downstream: []string{"Message:LoopDone"}},
|
|
"LoopItem:InputUntil1Start": {Obj: CanvasComponentObj{ComponentName: "LoopItem"}},
|
|
"UserFillUp:LoopInput": {Obj: CanvasComponentObj{ComponentName: "UserFillUp"}},
|
|
"Switch:LoopCheck": {Obj: CanvasComponentObj{ComponentName: "Switch"}},
|
|
"ExitLoop:LoopExit": {Obj: CanvasComponentObj{ComponentName: "ExitLoop"}},
|
|
"Message:LoopContinue": {Obj: CanvasComponentObj{ComponentName: "Message"}},
|
|
"Message:LoopDone": {Obj: CanvasComponentObj{ComponentName: "Message"}},
|
|
},
|
|
NodeParents: map[string]string{
|
|
"LoopItem:InputUntil1Start": "Loop:InputUntil1",
|
|
"UserFillUp:LoopInput": "Loop:InputUntil1",
|
|
"Switch:LoopCheck": "Loop:InputUntil1",
|
|
"ExitLoop:LoopExit": "Loop:InputUntil1",
|
|
"Message:LoopContinue": "Loop:InputUntil1",
|
|
},
|
|
}
|
|
|
|
got := collectLoopMembers(c, "Loop:InputUntil1")
|
|
for _, want := range []string{
|
|
"LoopItem:InputUntil1Start",
|
|
"UserFillUp:LoopInput",
|
|
"Switch:LoopCheck",
|
|
"ExitLoop:LoopExit",
|
|
"Message:LoopContinue",
|
|
} {
|
|
if !got[want] {
|
|
t.Fatalf("grouped loop member %q missing from %v", want, got)
|
|
}
|
|
}
|
|
if got["Message:LoopDone"] {
|
|
t.Fatalf("outer follower should not be a loop member: %v", got)
|
|
}
|
|
}
|
|
|
|
func TestCollectDescendants_DAG(t *testing.T) {
|
|
// 4-node chain: loop -> a -> b -> c -> d (d has no downstream).
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"loop": {Obj: CanvasComponentObj{ComponentName: "Loop"},
|
|
Downstream: []string{"a"}},
|
|
"a": {Obj: CanvasComponentObj{ComponentName: "Message"},
|
|
Upstream: []string{"loop"}, Downstream: []string{"b"}},
|
|
"b": {Obj: CanvasComponentObj{ComponentName: "LLM"},
|
|
Upstream: []string{"a"}, Downstream: []string{"c"}},
|
|
"c": {Obj: CanvasComponentObj{ComponentName: "Categorize"},
|
|
Upstream: []string{"b"}, Downstream: []string{"d"}},
|
|
"d": {Obj: CanvasComponentObj{ComponentName: "Message"},
|
|
Upstream: []string{"c"}},
|
|
},
|
|
}
|
|
got := collectDescendants(c, "loop")
|
|
want := map[string]bool{"a": true, "b": true, "c": true, "d": true}
|
|
if len(got) != len(want) {
|
|
t.Fatalf("got %v, want %v", got, want)
|
|
}
|
|
for k := range want {
|
|
if !got[k] {
|
|
t.Errorf("missing %q in %v", k, got)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestCollectDescendants_Diamond(t *testing.T) {
|
|
// loop -> a -> b -> d
|
|
// \-> c -/
|
|
// d is the join, must appear once.
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"loop": {Obj: CanvasComponentObj{ComponentName: "Loop"},
|
|
Downstream: []string{"a"}},
|
|
"a": {Obj: CanvasComponentObj{ComponentName: "Message"},
|
|
Upstream: []string{"loop"}, Downstream: []string{"b", "c"}},
|
|
"b": {Obj: CanvasComponentObj{ComponentName: "LLM"},
|
|
Upstream: []string{"a"}, Downstream: []string{"d"}},
|
|
"c": {Obj: CanvasComponentObj{ComponentName: "Categorize"},
|
|
Upstream: []string{"a"}, Downstream: []string{"d"}},
|
|
"d": {Obj: CanvasComponentObj{ComponentName: "Message"},
|
|
Upstream: []string{"b", "c"}},
|
|
},
|
|
}
|
|
got := collectDescendants(c, "loop")
|
|
want := map[string]bool{"a": true, "b": true, "c": true, "d": true}
|
|
if len(got) != len(want) {
|
|
t.Fatalf("got %v, want %v", got, want)
|
|
}
|
|
for k := range want {
|
|
if !got[k] {
|
|
t.Errorf("missing %q in %v", k, got)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestCollectDescendants_BackEdgeStops(t *testing.T) {
|
|
// loop -> a -> b -> loop (back-edge). BFS must not loop forever;
|
|
// visited stops at the back-edge.
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"loop": {Obj: CanvasComponentObj{ComponentName: "Loop"},
|
|
Downstream: []string{"a"}},
|
|
"a": {Obj: CanvasComponentObj{ComponentName: "Message"},
|
|
Upstream: []string{"loop"}, Downstream: []string{"b"}},
|
|
"b": {Obj: CanvasComponentObj{ComponentName: "LLM"},
|
|
Upstream: []string{"a"}, Downstream: []string{"loop"}},
|
|
},
|
|
}
|
|
got := collectDescendants(c, "loop")
|
|
want := map[string]bool{"a": true, "b": true}
|
|
if len(got) != len(want) {
|
|
t.Fatalf("got %v, want %v", got, want)
|
|
}
|
|
}
|
|
|
|
// ---- resolveInitialVariables ----
|
|
|
|
func TestResolveInitialVariables_Constant(t *testing.T) {
|
|
params := map[string]any{
|
|
"loop_variables": []any{
|
|
map[string]any{
|
|
"variable": "counter",
|
|
"input_mode": "constant",
|
|
"value": 7,
|
|
"type": "number",
|
|
},
|
|
},
|
|
}
|
|
got, err := resolveInitialVariables(params)
|
|
if err != nil {
|
|
t.Fatalf("resolveInitialVariables: %v", err)
|
|
}
|
|
spec, ok := got["counter"]
|
|
if !ok {
|
|
t.Fatalf("counter: missing key in result map")
|
|
}
|
|
if spec.InputMode != "constant" {
|
|
t.Errorf("counter: input_mode got %q, want \"constant\"", spec.InputMode)
|
|
}
|
|
if spec.Value != 7 {
|
|
t.Errorf("counter: value got %v, want 7", spec.Value)
|
|
}
|
|
}
|
|
|
|
func TestResolveInitialVariables_ZeroInit(t *testing.T) {
|
|
cases := []struct {
|
|
typ string
|
|
want any
|
|
}{
|
|
{"number", 0},
|
|
{"string", ""},
|
|
{"boolean", false},
|
|
{"object", map[string]any{}},
|
|
{"object<string>", map[string]any{}},
|
|
{"array", []any{}},
|
|
{"array<string>", []any{}},
|
|
{"unknown-type", ""},
|
|
}
|
|
for _, tc := range cases {
|
|
params := map[string]any{
|
|
"loop_variables": []any{
|
|
map[string]any{
|
|
"variable": "v",
|
|
"input_mode": "",
|
|
"value": nil,
|
|
"type": tc.typ,
|
|
},
|
|
},
|
|
}
|
|
got, err := resolveInitialVariables(params)
|
|
if err != nil {
|
|
t.Fatalf("typ %q: %v", tc.typ, err)
|
|
}
|
|
spec, ok := got["v"]
|
|
if !ok {
|
|
t.Fatalf("typ %q: missing key in result map", tc.typ)
|
|
}
|
|
// Special-case the untyped-empty value to skip the equal check
|
|
// on slices/maps (reflect.DeepEqual semantics).
|
|
if !valueEqual(spec.Value, tc.want) {
|
|
t.Errorf("typ %q: got %v (%T), want %v (%T)", tc.typ, spec.Value, spec.Value, tc.want, tc.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestResolveInitialVariables_VariablePassthrough(t *testing.T) {
|
|
// "variable" mode's runtime dereference happens in the init lambda
|
|
// (buildSubWorkflow). resolveInitialVariables is state-free, so it
|
|
// just returns the ref string in Value plus the input_mode tag so
|
|
// the init lambda knows to dereference.
|
|
params := map[string]any{
|
|
"loop_variables": []any{
|
|
map[string]any{
|
|
"variable": "x",
|
|
"input_mode": "variable",
|
|
"value": "Begin.foo",
|
|
"type": "string",
|
|
},
|
|
},
|
|
}
|
|
got, err := resolveInitialVariables(params)
|
|
if err != nil {
|
|
t.Fatalf("resolveInitialVariables: %v", err)
|
|
}
|
|
spec, ok := got["x"]
|
|
if !ok {
|
|
t.Fatalf("x: missing key in result map")
|
|
}
|
|
if spec.InputMode != "variable" {
|
|
t.Errorf("x: input_mode got %q, want \"variable\"", spec.InputMode)
|
|
}
|
|
if spec.Value != "Begin.foo" {
|
|
t.Errorf("x: value got %v, want \"Begin.foo\"", spec.Value)
|
|
}
|
|
}
|
|
|
|
func TestResolveInitialVariables_Incomplete(t *testing.T) {
|
|
cases := []map[string]any{
|
|
// missing 'variable'
|
|
{"input_mode": "constant", "value": 1, "type": "number"},
|
|
// missing 'input_mode'
|
|
{"variable": "x", "value": 1, "type": "number"},
|
|
// missing 'value'
|
|
{"variable": "x", "input_mode": "constant", "type": "number"},
|
|
// missing 'type'
|
|
{"variable": "x", "input_mode": "constant", "value": 1},
|
|
}
|
|
for i, item := range cases {
|
|
params := map[string]any{"loop_variables": []any{item}}
|
|
if _, err := resolveInitialVariables(params); err == nil {
|
|
t.Errorf("case %d: expected error, got nil", i)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---- zeroValueForType ----
|
|
|
|
func TestZeroValueForType(t *testing.T) {
|
|
cases := []struct {
|
|
typ any
|
|
want any
|
|
}{
|
|
{"number", 0},
|
|
{"string", ""},
|
|
{"boolean", false},
|
|
{"object", map[string]any{}},
|
|
{"object<string>", map[string]any{}},
|
|
{"array", []any{}},
|
|
{"array<string>", []any{}},
|
|
{"weird", ""},
|
|
{nil, ""},
|
|
}
|
|
for _, tc := range cases {
|
|
got := zeroValueForType(tc.typ)
|
|
if !valueEqual(got, tc.want) {
|
|
t.Errorf("typ %v: got %v, want %v", tc.typ, got, tc.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---- readMaxLoopCount ----
|
|
|
|
func TestReadMaxLoopCount(t *testing.T) {
|
|
cases := []struct {
|
|
name string
|
|
in map[string]any
|
|
want int
|
|
}{
|
|
{"missing", map[string]any{}, 0},
|
|
{"int", map[string]any{"maximum_loop_count": 5}, 5},
|
|
{"int64", map[string]any{"maximum_loop_count": int64(7)}, 7},
|
|
{"float64", map[string]any{"maximum_loop_count": 3.0}, 3},
|
|
{"string", map[string]any{"maximum_loop_count": "5"}, 0},
|
|
}
|
|
for _, tc := range cases {
|
|
if got := readMaxLoopCount(tc.in); got != tc.want {
|
|
t.Errorf("%s: got %d, want %d", tc.name, got, tc.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---- translateLoopCondition ----
|
|
|
|
func TestTranslateLoopCondition_SingleOp(t *testing.T) {
|
|
params := map[string]any{
|
|
"logical_operator": "and",
|
|
"loop_termination_condition": []any{
|
|
map[string]any{
|
|
"variable": "counter",
|
|
"operator": "≥",
|
|
"value": 3,
|
|
"input_mode": "constant",
|
|
},
|
|
},
|
|
}
|
|
cond, err := translateLoopCondition("loop_0", params)
|
|
if err != nil {
|
|
t.Fatalf("translateLoopCondition: %v", err)
|
|
}
|
|
state := NewCanvasState("", "")
|
|
state.SetVar("loop_0", "counter", 3)
|
|
ctx := WithState(context.Background(), state)
|
|
quit, err := cond(ctx, 3, nil, nil)
|
|
if err != nil {
|
|
t.Fatalf("cond: %v", err)
|
|
}
|
|
if !quit {
|
|
t.Errorf("expected quit when counter=3 >= 3")
|
|
}
|
|
// counter=2 should NOT quit.
|
|
state2 := NewCanvasState("", "")
|
|
state2.SetVar("loop_0", "counter", 2)
|
|
ctx2 := WithState(context.Background(), state2)
|
|
quit, err = cond(ctx2, 2, nil, nil)
|
|
if err != nil {
|
|
t.Fatalf("cond: %v", err)
|
|
}
|
|
if quit {
|
|
t.Errorf("expected no-quit when counter=2 < 3")
|
|
}
|
|
}
|
|
|
|
func TestTranslateLoopCondition_OrQuitsEarly(t *testing.T) {
|
|
// Two conditions OR'd. quits as soon as one is true.
|
|
params := map[string]any{
|
|
"logical_operator": "or",
|
|
"loop_termination_condition": []any{
|
|
map[string]any{"variable": "a", "operator": "=", "value": 1, "input_mode": "constant"},
|
|
map[string]any{"variable": "b", "operator": "=", "value": 2, "input_mode": "constant"},
|
|
},
|
|
}
|
|
cond, err := translateLoopCondition("L", params)
|
|
if err != nil {
|
|
t.Fatalf("translate: %v", err)
|
|
}
|
|
// a=1, b=0 → quits (first condition true).
|
|
state := NewCanvasState("", "")
|
|
state.SetVar("L", "a", 1)
|
|
state.SetVar("L", "b", 0)
|
|
quit, err := cond(WithState(context.Background(), state), 1, nil, nil)
|
|
if err != nil {
|
|
t.Fatalf("cond: %v", err)
|
|
}
|
|
if !quit {
|
|
t.Errorf("OR with a=1 should quit")
|
|
}
|
|
// a=0, b=2 → quits (second condition true).
|
|
state2 := NewCanvasState("", "")
|
|
state2.SetVar("L", "a", 0)
|
|
state2.SetVar("L", "b", 2)
|
|
quit, err = cond(WithState(context.Background(), state2), 1, nil, nil)
|
|
if err != nil {
|
|
t.Fatalf("cond: %v", err)
|
|
}
|
|
if !quit {
|
|
t.Errorf("OR with b=2 should quit")
|
|
}
|
|
// a=0, b=0 → no quit.
|
|
state3 := NewCanvasState("", "")
|
|
state3.SetVar("L", "a", 0)
|
|
state3.SetVar("L", "b", 0)
|
|
quit, err = cond(WithState(context.Background(), state3), 1, nil, nil)
|
|
if err != nil {
|
|
t.Fatalf("cond: %v", err)
|
|
}
|
|
if quit {
|
|
t.Errorf("OR with both 0 should not quit")
|
|
}
|
|
}
|
|
|
|
func TestTranslateLoopCondition_AndRequiresAll(t *testing.T) {
|
|
params := map[string]any{
|
|
"loop_termination_condition": []any{
|
|
map[string]any{"variable": "a", "operator": "=", "value": 1, "input_mode": "constant"},
|
|
map[string]any{"variable": "b", "operator": "=", "value": 2, "input_mode": "constant"},
|
|
},
|
|
}
|
|
cond, err := translateLoopCondition("L", params)
|
|
if err != nil {
|
|
t.Fatalf("translate: %v", err)
|
|
}
|
|
// a=1, b=2 → quits.
|
|
state := NewCanvasState("", "")
|
|
state.SetVar("L", "a", 1)
|
|
state.SetVar("L", "b", 2)
|
|
quit, _ := cond(WithState(context.Background(), state), 1, nil, nil)
|
|
if !quit {
|
|
t.Errorf("AND with both true should quit")
|
|
}
|
|
// a=1, b=0 → no quit (default logical_op is "and").
|
|
state2 := NewCanvasState("", "")
|
|
state2.SetVar("L", "a", 1)
|
|
state2.SetVar("L", "b", 0)
|
|
quit, _ = cond(WithState(context.Background(), state2), 1, nil, nil)
|
|
if quit {
|
|
t.Errorf("AND with one false should not quit")
|
|
}
|
|
}
|
|
|
|
func TestTranslateLoopCondition_EmptyConditionsNeverQuit(t *testing.T) {
|
|
params := map[string]any{
|
|
"logical_operator": "and",
|
|
}
|
|
cond, err := translateLoopCondition("L", params)
|
|
if err != nil {
|
|
t.Fatalf("translate: %v", err)
|
|
}
|
|
state := NewCanvasState("", "")
|
|
quit, err := cond(WithState(context.Background(), state), 1, nil, nil)
|
|
if err != nil {
|
|
t.Fatalf("cond: %v", err)
|
|
}
|
|
if quit {
|
|
t.Errorf("empty conditions must never quit (max count is the only terminator)")
|
|
}
|
|
}
|
|
|
|
func TestTranslateLoopCondition_InvalidLogicalOp(t *testing.T) {
|
|
params := map[string]any{
|
|
"logical_operator": "xor",
|
|
}
|
|
if _, err := translateLoopCondition("L", params); err == nil {
|
|
t.Errorf("expected error on invalid logical_operator")
|
|
}
|
|
}
|
|
|
|
func TestTranslateLoopCondition_IncompleteEntry(t *testing.T) {
|
|
cases := []map[string]any{
|
|
{"operator": "=", "value": 1}, // missing variable
|
|
{"variable": "x"}, // missing operator
|
|
{"variable": "x", "operator": ""}, // empty operator
|
|
}
|
|
for i, item := range cases {
|
|
params := map[string]any{
|
|
"loop_termination_condition": []any{item},
|
|
}
|
|
if _, err := translateLoopCondition("L", params); err == nil {
|
|
t.Errorf("case %d: expected error on incomplete entry", i)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestTranslateLoopCondition_VariableInputMode(t *testing.T) {
|
|
// condition's value input_mode is "variable" → resolve the value ref
|
|
// from state before applying the operator.
|
|
params := map[string]any{
|
|
"loop_termination_condition": []any{
|
|
map[string]any{
|
|
"variable": "counter",
|
|
"operator": "≥",
|
|
"value": "Begin@threshold",
|
|
"input_mode": "variable",
|
|
},
|
|
},
|
|
}
|
|
cond, err := translateLoopCondition("L", params)
|
|
if err != nil {
|
|
t.Fatalf("translate: %v", err)
|
|
}
|
|
state := NewCanvasState("", "")
|
|
state.SetVar("L", "counter", 10)
|
|
state.SetVar("Begin", "threshold", 5)
|
|
quit, _ := cond(WithState(context.Background(), state), 1, nil, nil)
|
|
if !quit {
|
|
t.Errorf("counter(10) >= threshold(5) should quit")
|
|
}
|
|
}
|
|
|
|
// ---- evaluateCondition operator dispatch ----
|
|
|
|
func TestEvaluateCondition_StringOps(t *testing.T) {
|
|
cases := []struct {
|
|
op string
|
|
value any
|
|
want bool
|
|
}{
|
|
{"contains", "ell", true},
|
|
{"not contains", "zzz", true},
|
|
{"start with", "hel", true},
|
|
{"end with", "llo", true},
|
|
{"is", "hello", true},
|
|
{"is not", "world", true},
|
|
{"empty", nil, false}, // "hello" != ""
|
|
{"not empty", nil, true},
|
|
}
|
|
for _, tc := range cases {
|
|
got, err := evaluateCondition("hello", tc.op, tc.value)
|
|
if err != nil {
|
|
t.Errorf("op=%s: %v", tc.op, err)
|
|
continue
|
|
}
|
|
if got != tc.want {
|
|
t.Errorf("op=%s: got %v, want %v", tc.op, got, tc.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestEvaluateCondition_NumberOps(t *testing.T) {
|
|
cases := []struct {
|
|
op string
|
|
value any
|
|
want bool
|
|
}{
|
|
{"=", 5, true},
|
|
{"≠", 6, true},
|
|
{">", 4, true},
|
|
{"<", 6, true},
|
|
{"≥", 5, true},
|
|
{"≤", 5, true},
|
|
}
|
|
for _, tc := range cases {
|
|
got, err := evaluateCondition(5, tc.op, tc.value)
|
|
if err != nil {
|
|
t.Errorf("op=%s: %v", tc.op, err)
|
|
continue
|
|
}
|
|
if got != tc.want {
|
|
t.Errorf("op=%s: got %v, want %v", tc.op, got, tc.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestEvaluateCondition_InvalidOp(t *testing.T) {
|
|
if _, err := evaluateCondition("hello", "bogus", "x"); err == nil {
|
|
t.Errorf("expected error on unknown operator")
|
|
}
|
|
}
|
|
|
|
// ---- BuildWorkflow end-to-end (with a Loop cpn) ----
|
|
|
|
func TestBuildWorkflow_LoopInstallsOneNode(t *testing.T) {
|
|
// DSL: Begin -> Loop -> Message
|
|
// The Loop has no real body, so its sub-graph is just the
|
|
// synthetic init lambda. The outer workflow should have 3
|
|
// eino nodes total: Begin, the loop node, Message.
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"begin": {Obj: CanvasComponentObj{ComponentName: "Begin"},
|
|
Downstream: []string{"loop"}},
|
|
"loop": {Obj: CanvasComponentObj{ComponentName: "Loop",
|
|
Params: map[string]any{
|
|
"loop_variables": []any{},
|
|
}},
|
|
Upstream: []string{"begin"}, Downstream: []string{"msg"}},
|
|
"msg": {Obj: CanvasComponentObj{ComponentName: "Message"},
|
|
Upstream: []string{"loop"}},
|
|
},
|
|
}
|
|
if _, err := BuildWorkflow(context.Background(), c); err != nil {
|
|
t.Fatalf("BuildWorkflow: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestBuildWorkflow_LegacyExitLoop(t *testing.T) {
|
|
// DSL with a standalone "ExitLoop" node. The Go port has no
|
|
// implementation for it, but legacyNoOpNames accepts it as a
|
|
// no-op echo node. BuildWorkflow must succeed.
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"begin": {Obj: CanvasComponentObj{ComponentName: "Begin"},
|
|
Downstream: []string{"exit"}},
|
|
"exit": {Obj: CanvasComponentObj{ComponentName: "ExitLoop"},
|
|
Upstream: []string{"begin"}},
|
|
},
|
|
}
|
|
if _, err := BuildWorkflow(context.Background(), c); err != nil {
|
|
t.Fatalf("BuildWorkflow with ExitLoop: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestBuildWorkflow_LoopExitLoopDoesNotBecomeTerminal(t *testing.T) {
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"begin": {
|
|
Obj: CanvasComponentObj{ComponentName: "Begin"},
|
|
Downstream: []string{"loop"},
|
|
},
|
|
"loop": {
|
|
Obj: CanvasComponentObj{
|
|
ComponentName: "Loop",
|
|
Params: map[string]any{
|
|
"logical_operator": "and",
|
|
"loop_termination_condition": []any{
|
|
map[string]any{
|
|
"input_mode": "constant",
|
|
"operator": "is",
|
|
"value": "1",
|
|
"variable": "UserFillUp:LoopInput@value",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
Downstream: []string{"done"},
|
|
Upstream: []string{"begin"},
|
|
},
|
|
"loop_item": {
|
|
Obj: CanvasComponentObj{ComponentName: "IterationItem"},
|
|
Downstream: []string{"input"},
|
|
Upstream: []string{"loop"},
|
|
},
|
|
"input": {
|
|
Obj: CanvasComponentObj{ComponentName: "UserFillUp", Params: map[string]any{"inputs": map[string]any{"value": map[string]any{"type": "line"}}}},
|
|
Downstream: []string{"check"},
|
|
Upstream: []string{"loop_item"},
|
|
},
|
|
"check": {
|
|
Obj: CanvasComponentObj{
|
|
ComponentName: "Switch",
|
|
Params: map[string]any{
|
|
"conditions": []any{
|
|
map[string]any{
|
|
"logical_operator": "and",
|
|
"items": []any{
|
|
map[string]any{"cpn_id": "UserFillUp:LoopInput@value", "operator": "=", "value": "1"},
|
|
},
|
|
"to": []any{"exit"},
|
|
},
|
|
},
|
|
"end_cpn_ids": []any{"continue"},
|
|
},
|
|
},
|
|
Downstream: []string{"exit", "continue"},
|
|
Upstream: []string{"input"},
|
|
},
|
|
"exit": {
|
|
Obj: CanvasComponentObj{ComponentName: "ExitLoop"},
|
|
Upstream: []string{"check"},
|
|
},
|
|
"continue": {
|
|
Obj: CanvasComponentObj{ComponentName: "Message", Params: map[string]any{"content": []any{"continue"}}},
|
|
Upstream: []string{"check"},
|
|
},
|
|
"done": {
|
|
Obj: CanvasComponentObj{ComponentName: "Message", Params: map[string]any{"content": []any{"done"}}},
|
|
Upstream: []string{"loop"},
|
|
},
|
|
},
|
|
NodeParents: map[string]string{
|
|
"loop_item": "loop",
|
|
"input": "loop",
|
|
"check": "loop",
|
|
"exit": "loop",
|
|
"continue": "loop",
|
|
},
|
|
}
|
|
if _, err := BuildWorkflow(context.Background(), c); err != nil {
|
|
t.Fatalf("BuildWorkflow with grouped loop ExitLoop: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestBuildWorkflow_UnknownComponentErrors(t *testing.T) {
|
|
// A component name that is neither in legacyNoOpNames nor in the
|
|
// isKnownPrimitive allowlist must produce a clear error from
|
|
// BuildWorkflow. Silent acceptance would mask DSL typos until the
|
|
// workflow failed at runtime.
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"begin": {Obj: CanvasComponentObj{ComponentName: "Begin"},
|
|
Downstream: []string{"bogus"}},
|
|
"bogus": {Obj: CanvasComponentObj{ComponentName: "FakeComponent"},
|
|
Upstream: []string{"begin"}},
|
|
},
|
|
}
|
|
_, err := BuildWorkflow(context.Background(), c)
|
|
if err == nil {
|
|
t.Fatal("expected error on unknown component name, got nil")
|
|
}
|
|
// The error must mention the cpn_id AND the offending name so the
|
|
// orchestrator can surface an actionable diagnostic.
|
|
if !strings.Contains(err.Error(), "bogus") || !strings.Contains(err.Error(), "FakeComponent") {
|
|
t.Errorf("error should name both cpn and component; got: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestBuildWorkflow_EmptyComponentNameErrors(t *testing.T) {
|
|
// A component with an empty component_name is a DSL bug. BuildWorkflow
|
|
// must reject it rather than passing through to the placeholder path.
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"begin": {Obj: CanvasComponentObj{ComponentName: "Begin"},
|
|
Downstream: []string{"empty"}},
|
|
"empty": {Obj: CanvasComponentObj{ComponentName: ""},
|
|
Upstream: []string{"begin"}},
|
|
},
|
|
}
|
|
_, err := BuildWorkflow(context.Background(), c)
|
|
if err == nil {
|
|
t.Fatal("expected error on empty component_name, got nil")
|
|
}
|
|
}
|
|
|
|
func TestBuildWorkflow_LoopSharesOuterCanvasState(t *testing.T) {
|
|
// State-sharing contract: the Loop's sub-graph and the outer
|
|
// workflow must operate on the SAME *CanvasState instance. eino
|
|
// nests Workflows by composition — if the outer's WithGenLocalState
|
|
// is bypassed at the lambda boundary, the sub-workflow would not
|
|
// see loop variables and the loop could never terminate.
|
|
//
|
|
// The buildSubWorkflow init lambda writes
|
|
// state.Outputs[loopID][varName]; the LoopCondition closure
|
|
// reads the same slot via state.GetVar. For this to round-trip
|
|
// the two paths must share the same *CanvasState.
|
|
//
|
|
// We verify the contract at two levels:
|
|
//
|
|
// 1. structural: buildLoopExpansion / buildSubWorkflow must
|
|
// not clone or shadow state in their helpers, and the
|
|
// returned sub-workflow must be non-nil.
|
|
// 2. runtime: we attach a *CanvasState to ctx via WithState,
|
|
// replay the init lambda's body manually (it is a single
|
|
// GetStateFromContext + SetVar pair), and read it back via
|
|
// GetVar to confirm the SAME instance is observable from
|
|
// both sides.
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"begin": {Obj: CanvasComponentObj{ComponentName: "Begin"},
|
|
Downstream: []string{"loop"}},
|
|
"loop": {Obj: CanvasComponentObj{ComponentName: "Loop",
|
|
Params: map[string]any{
|
|
"loop_variables": []any{
|
|
map[string]any{
|
|
"variable": "counter",
|
|
"input_mode": "constant",
|
|
"value": 0,
|
|
"type": "number",
|
|
},
|
|
},
|
|
"loop_termination_condition": []any{
|
|
map[string]any{
|
|
"variable": "counter",
|
|
"operator": "≥",
|
|
"value": 3,
|
|
"input_mode": "constant",
|
|
},
|
|
},
|
|
}},
|
|
Upstream: []string{"begin"}},
|
|
},
|
|
}
|
|
exp, err := buildLoopExpansion(context.Background(), c, "loop")
|
|
if err != nil {
|
|
t.Fatalf("buildLoopExpansion: %v", err)
|
|
}
|
|
if exp.Sub == nil {
|
|
t.Fatal("sub-workflow is nil")
|
|
}
|
|
// Empty body — the loop has no descendants, so Members is empty
|
|
// and MaxIters defaults to 0 (= unbounded, condition-driven).
|
|
if exp.Members["begin"] {
|
|
t.Errorf("'begin' should NOT be a member of the loop's sub-graph")
|
|
}
|
|
if exp.MaxIters != 0 {
|
|
t.Errorf("MaxIters: got %d, want 0 (default = unbounded)", exp.MaxIters)
|
|
}
|
|
|
|
// Runtime contract: attach a state to ctx, run the same
|
|
// GetStateFromContext + SetVar sequence the init lambda
|
|
// performs, and confirm the mutation is visible to a
|
|
// LoopCondition-style reader on the SAME *CanvasState.
|
|
state := NewCanvasState("run-1", "task-1")
|
|
ctx := WithState(context.Background(), state)
|
|
|
|
got, _, err := GetStateFromContext[*CanvasState](ctx)
|
|
if err != nil {
|
|
t.Fatalf("GetStateFromContext: %v", err)
|
|
}
|
|
if got != state {
|
|
t.Errorf("GetStateFromContext returned a different *CanvasState instance")
|
|
}
|
|
// The init lambda writes "loop@counter" = 0.
|
|
got.SetVar("loop", "counter", 0)
|
|
// A LoopCondition closure would read it back via state.GetVar.
|
|
v, err := state.GetVar("loop@counter")
|
|
if err != nil {
|
|
t.Fatalf("GetVar: %v", err)
|
|
}
|
|
if v != 0 {
|
|
t.Errorf("counter: got %v, want 0 (init lambda should seed it)", v)
|
|
}
|
|
// The reader and writer MUST be the same instance — a clone
|
|
// would mean the loop's "update counter, check counter" cycle
|
|
// would never converge.
|
|
if got != state {
|
|
t.Errorf("state was cloned somewhere — writer and reader see different instances")
|
|
}
|
|
}
|
|
|
|
func TestBuildWorkflow_LoopWithBody(t *testing.T) {
|
|
// DSL: Begin -> Loop -> A -> B
|
|
// A and B are body members of the Loop's sub-graph.
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"begin": {Obj: CanvasComponentObj{ComponentName: "Begin"},
|
|
Downstream: []string{"loop"}},
|
|
"loop": {Obj: CanvasComponentObj{ComponentName: "Loop",
|
|
Params: map[string]any{
|
|
"loop_variables": []any{
|
|
map[string]any{
|
|
"variable": "counter",
|
|
"input_mode": "constant",
|
|
"value": 0,
|
|
"type": "number",
|
|
},
|
|
},
|
|
"loop_termination_condition": []any{
|
|
map[string]any{
|
|
"variable": "counter",
|
|
"operator": "≥",
|
|
"value": 3,
|
|
"input_mode": "constant",
|
|
},
|
|
},
|
|
"maximum_loop_count": 10,
|
|
}},
|
|
Upstream: []string{"begin"}, Downstream: []string{"a"}},
|
|
"a": {Obj: CanvasComponentObj{ComponentName: "Message"},
|
|
Upstream: []string{"loop"}, Downstream: []string{"b"}},
|
|
"b": {Obj: CanvasComponentObj{ComponentName: "LLM"},
|
|
Upstream: []string{"a"}},
|
|
},
|
|
}
|
|
if _, err := BuildWorkflow(context.Background(), c); err != nil {
|
|
t.Fatalf("BuildWorkflow: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestBuildWorkflow_LoopBodyWithMultiTerminalCompiles(t *testing.T) {
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"begin": {
|
|
Obj: CanvasComponentObj{ComponentName: "Begin"},
|
|
Downstream: []string{"loop"},
|
|
},
|
|
"loop": {
|
|
Obj: CanvasComponentObj{
|
|
ComponentName: "Loop",
|
|
Params: map[string]any{
|
|
"loop_variables": []any{
|
|
map[string]any{
|
|
"variable": "counter",
|
|
"input_mode": "constant",
|
|
"value": 0,
|
|
"type": "number",
|
|
},
|
|
},
|
|
"loop_termination_condition": []any{
|
|
map[string]any{
|
|
"variable": "counter",
|
|
"operator": "≥",
|
|
"value": 1,
|
|
"input_mode": "constant",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
Upstream: []string{"begin"},
|
|
Downstream: []string{"branch"},
|
|
},
|
|
"branch": {
|
|
Obj: CanvasComponentObj{ComponentName: "Categorize"},
|
|
Upstream: []string{"loop"},
|
|
Downstream: []string{"left", "right"},
|
|
},
|
|
"left": {
|
|
Obj: CanvasComponentObj{ComponentName: "Message"},
|
|
Upstream: []string{"branch"},
|
|
},
|
|
"right": {
|
|
Obj: CanvasComponentObj{ComponentName: "Message"},
|
|
Upstream: []string{"branch"},
|
|
},
|
|
},
|
|
}
|
|
|
|
if _, err := BuildWorkflow(context.Background(), c); err != nil {
|
|
t.Fatalf("BuildWorkflow with loop multi-terminal body: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestBuildWorkflow_LoopMissingParams(t *testing.T) {
|
|
// A Loop with no params at all — empty loop_variables and empty
|
|
// loop_termination_condition. The macro expansion should still
|
|
// succeed (the condition closure becomes a never-quit predicate,
|
|
// the init lambda writes nothing).
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"begin": {Obj: CanvasComponentObj{ComponentName: "Begin"},
|
|
Downstream: []string{"loop"}},
|
|
"loop": {Obj: CanvasComponentObj{ComponentName: "Loop",
|
|
Params: map[string]any{}},
|
|
Upstream: []string{"begin"}},
|
|
},
|
|
}
|
|
if _, err := BuildWorkflow(context.Background(), c); err != nil {
|
|
t.Fatalf("BuildWorkflow: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestBuildWorkflow_LoopIncompleteCondition(t *testing.T) {
|
|
// A Loop with a malformed condition entry. BuildWorkflow must
|
|
// surface the error from translateLoopCondition.
|
|
c := &Canvas{
|
|
Components: map[string]CanvasComponent{
|
|
"begin": {Obj: CanvasComponentObj{ComponentName: "Begin"},
|
|
Downstream: []string{"loop"}},
|
|
"loop": {Obj: CanvasComponentObj{ComponentName: "Loop",
|
|
Params: map[string]any{
|
|
"loop_termination_condition": []any{
|
|
map[string]any{"operator": "=", "value": 1}, // missing variable
|
|
},
|
|
}},
|
|
Upstream: []string{"begin"}},
|
|
},
|
|
}
|
|
if _, err := BuildWorkflow(context.Background(), c); err == nil {
|
|
t.Errorf("expected error on incomplete condition")
|
|
}
|
|
}
|
|
|
|
// ---- valueEqual: reflect.DeepEqual except for untyped nil vs typed nil ----
|
|
|
|
func valueEqual(a, b any) bool {
|
|
if a == nil && b == nil {
|
|
return true
|
|
}
|
|
if a == nil || b == nil {
|
|
return false
|
|
}
|
|
// Use type-aware comparison for maps and slices to handle the
|
|
// case where one side is nil-typed and the other is the zero
|
|
// value.
|
|
switch av := a.(type) {
|
|
case map[string]any:
|
|
bv, ok := b.(map[string]any)
|
|
if !ok || len(av) != len(bv) {
|
|
return false
|
|
}
|
|
for k, v := range av {
|
|
if !valueEqual(v, bv[k]) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
case []any:
|
|
bv, ok := b.([]any)
|
|
if !ok || len(av) != len(bv) {
|
|
return false
|
|
}
|
|
for i := range av {
|
|
if !valueEqual(av[i], bv[i]) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
return a == b
|
|
}
|