mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
## Summary
Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.
The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.
## Scope of alignment
### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.
### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.
### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.
### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.
### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.
## Out of scope (intentionally)
- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.
## How alignment is verified
`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle
`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).
`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.
```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```
## Design reference
`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).
---------
Co-authored-by: Claude <noreply@anthropic.com>
329 lines
11 KiB
Go
329 lines
11 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
// Tests for the Phase 4.4 V2 canvas DSL decoder.
|
|
//
|
|
// Each test pins one branch of decodeCanvasFromDSL:
|
|
// - empty DSL → ErrAgentStorageError (no JSON-round-trip happens)
|
|
// - non-empty DSL with one component → clean *canvas.Canvas
|
|
// - non-empty DSL with multiple components → all preserved
|
|
//
|
|
// All failures MUST carry ErrAgentStorageError so the handler-side
|
|
// mapAgentError classifies them as CodeServerError (500) with the
|
|
// sanitized message (no raw decoder-string leak). This is the
|
|
// regression pin that closes the v3.5.2 review's "raw decode error
|
|
// leak" concern at the Compile/Invoke boundary.
|
|
package service
|
|
|
|
import (
|
|
"errors"
|
|
"testing"
|
|
|
|
agenttool "ragflow/internal/agent/tool"
|
|
)
|
|
|
|
// TestDecodeCanvasFromDSL_EmptyDSL pins the "empty DSL" branch:
|
|
// decodeCanvasFromDSL(nil) must return an error wrapping
|
|
// ErrAgentStorageError without attempting json.Marshal (the empty
|
|
// map serialises to "{}" which would decode to an empty Canvas,
|
|
// bypassing the "no components" check).
|
|
func TestDecodeCanvasFromDSL_EmptyDSL(t *testing.T) {
|
|
t.Parallel()
|
|
_, err := decodeCanvasFromDSL(nil)
|
|
if err == nil {
|
|
t.Fatal("expected error for empty DSL")
|
|
}
|
|
if !errors.Is(err, ErrAgentStorageError) {
|
|
t.Errorf("expected ErrAgentStorageError in chain, got %v", err)
|
|
}
|
|
}
|
|
|
|
// TestDecodeCanvasFromDSL_EmptyMapSameAsNil pins the equivalent
|
|
// branch for an explicitly-empty (but non-nil) map. Same outcome
|
|
// as the nil case — the function must reject before marshalling.
|
|
func TestDecodeCanvasFromDSL_EmptyMapSameAsNil(t *testing.T) {
|
|
t.Parallel()
|
|
_, err := decodeCanvasFromDSL(map[string]any{})
|
|
if err == nil {
|
|
t.Fatal("expected error for empty DSL map")
|
|
}
|
|
if !errors.Is(err, ErrAgentStorageError) {
|
|
t.Errorf("expected ErrAgentStorageError in chain, got %v", err)
|
|
}
|
|
}
|
|
|
|
// TestDecodeCanvasFromDSL_SingleComponent pins the happy-path:
|
|
// a DSL with one Begin component decodes to a *canvas.Canvas with
|
|
// that component under its id key.
|
|
func TestDecodeCanvasFromDSL_SingleComponent(t *testing.T) {
|
|
t.Parallel()
|
|
dsl := map[string]any{
|
|
"components": map[string]any{
|
|
"begin_0": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "Begin",
|
|
"params": map[string]any{},
|
|
},
|
|
"downstream": []any{"message_0"},
|
|
"upstream": []any{},
|
|
},
|
|
},
|
|
"path": []any{"begin_0"},
|
|
}
|
|
c, err := decodeCanvasFromDSL(dsl)
|
|
if err != nil {
|
|
t.Fatalf("decodeCanvasFromDSL: %v", err)
|
|
}
|
|
if c == nil {
|
|
t.Fatal("decoded Canvas is nil")
|
|
}
|
|
if len(c.Components) != 1 {
|
|
t.Errorf("Components length = %d, want 1", len(c.Components))
|
|
}
|
|
if _, ok := c.Components["begin_0"]; !ok {
|
|
t.Errorf("Components missing begin_0 key")
|
|
}
|
|
if len(c.Path) != 1 || c.Path[0] != "begin_0" {
|
|
t.Errorf("Path = %v, want [begin_0]", c.Path)
|
|
}
|
|
}
|
|
|
|
// TestDecodeCanvasFromDSL_MultiComponent pins the multi-node
|
|
// happy-path: Begin → Retrieval → Message → Answer decodes into
|
|
// 4 components with the correct Downstream / Upstream wiring.
|
|
// This is the shape real v1 DSLs use.
|
|
func TestDecodeCanvasFromDSL_MultiComponent(t *testing.T) {
|
|
t.Parallel()
|
|
dsl := map[string]any{
|
|
"components": map[string]any{
|
|
"begin_0": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "Begin",
|
|
"params": map[string]any{},
|
|
},
|
|
"downstream": []any{"retrieval_0"},
|
|
},
|
|
"retrieval_0": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "Retrieval",
|
|
"params": map[string]any{"kb_ids": []any{"kb-1"}},
|
|
},
|
|
"downstream": []any{"message_0"},
|
|
"upstream": []any{"begin_0"},
|
|
},
|
|
"message_0": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "Message",
|
|
"params": map[string]any{"text": "hi {{begin_0@result}}"},
|
|
},
|
|
"upstream": []any{"retrieval_0"},
|
|
},
|
|
},
|
|
"path": []any{"begin_0", "retrieval_0", "message_0"},
|
|
}
|
|
c, err := decodeCanvasFromDSL(dsl)
|
|
if err != nil {
|
|
t.Fatalf("decodeCanvasFromDSL: %v", err)
|
|
}
|
|
if len(c.Components) != 3 {
|
|
t.Errorf("Components length = %d, want 3", len(c.Components))
|
|
}
|
|
for _, id := range []string{"begin_0", "retrieval_0", "message_0"} {
|
|
if _, ok := c.Components[id]; !ok {
|
|
t.Errorf("Components missing %s", id)
|
|
}
|
|
}
|
|
if got := c.Components["retrieval_0"].Obj.ComponentName; got != "Retrieval" {
|
|
t.Errorf("retrieval_0 component_name = %q, want Retrieval", got)
|
|
}
|
|
}
|
|
|
|
// TestDecodeCanvasFromDSL_NoComponents pins the "non-empty map
|
|
// with no components" branch: a DSL whose top-level keys are
|
|
// non-component (e.g. only "globals") must still fail because the
|
|
// resulting Canvas would have an empty Components map. The error
|
|
// must chain ErrAgentStorageError so mapAgentError -> 500.
|
|
func TestDecodeCanvasFromDSL_NoComponents(t *testing.T) {
|
|
t.Parallel()
|
|
dsl := map[string]any{
|
|
"globals": map[string]any{
|
|
"sys.query": "",
|
|
},
|
|
}
|
|
_, err := decodeCanvasFromDSL(dsl)
|
|
if err == nil {
|
|
t.Fatal("expected error when DSL has no components")
|
|
}
|
|
if !errors.Is(err, ErrAgentStorageError) {
|
|
t.Errorf("expected ErrAgentStorageError in chain, got %v", err)
|
|
}
|
|
}
|
|
|
|
// TestDecodeCanvasFromDSL_GlobalsPreserved pins that round-tripping
|
|
// the DSL through JSON does not lose top-level metadata keys
|
|
// (globals, history, retrieval). These are needed at compile time
|
|
// for state-pre / state-post handlers to resolve references.
|
|
func TestDecodeCanvasFromDSL_GlobalsPreserved(t *testing.T) {
|
|
t.Parallel()
|
|
dsl := map[string]any{
|
|
"components": map[string]any{
|
|
"begin_0": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "Begin",
|
|
"params": map[string]any{},
|
|
},
|
|
"downstream": []any{},
|
|
},
|
|
},
|
|
"globals": map[string]any{
|
|
"sys.query": "hello",
|
|
"sys.user_id": "user-1",
|
|
"env.counter": 0.0,
|
|
"env.loop_done": false,
|
|
"env.sample_rows": []any{},
|
|
},
|
|
}
|
|
c, err := decodeCanvasFromDSL(dsl)
|
|
if err != nil {
|
|
t.Fatalf("decodeCanvasFromDSL: %v", err)
|
|
}
|
|
if c.Globals == nil {
|
|
t.Fatal("Globals is nil after round-trip")
|
|
}
|
|
if got, _ := c.Globals["sys.query"].(string); got != "hello" {
|
|
t.Errorf("Globals[sys.query] = %v, want \"hello\"", c.Globals["sys.query"])
|
|
}
|
|
if got, _ := c.Globals["sys.user_id"].(string); got != "user-1" {
|
|
t.Errorf("Globals[sys.user_id] = %v, want \"user-1\"", c.Globals["sys.user_id"])
|
|
}
|
|
}
|
|
|
|
func TestDecodeCanvasFromDSL_PreservesNodeParents(t *testing.T) {
|
|
t.Parallel()
|
|
dsl := map[string]any{
|
|
"graph": map[string]any{
|
|
"nodes": []any{
|
|
map[string]any{"id": "Iteration:IterateList"},
|
|
map[string]any{"id": "IterationItem:IterStart", "parentId": "Iteration:IterateList"},
|
|
map[string]any{"id": "StringTransform:FmtItem", "parentId": "Iteration:IterateList"},
|
|
map[string]any{"id": "Message:IterDone"},
|
|
},
|
|
},
|
|
"components": map[string]any{
|
|
"Iteration:IterateList": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "Parallel",
|
|
"params": map[string]any{"items_ref": "sys.items"},
|
|
},
|
|
"downstream": []any{"Message:IterDone"},
|
|
},
|
|
"IterationItem:IterStart": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "IterationItem",
|
|
"params": map[string]any{},
|
|
},
|
|
"downstream": []any{"StringTransform:FmtItem"},
|
|
},
|
|
"StringTransform:FmtItem": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "StringTransform",
|
|
"params": map[string]any{"method": "merge", "script": "{item}", "delimiters": []any{"|"}},
|
|
},
|
|
},
|
|
"Message:IterDone": map[string]any{
|
|
"obj": map[string]any{
|
|
"component_name": "Message",
|
|
"params": map[string]any{"content": []any{"done"}},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
c, err := decodeCanvasFromDSL(dsl)
|
|
if err != nil {
|
|
t.Fatalf("decodeCanvasFromDSL: %v", err)
|
|
}
|
|
if c.NodeParents["IterationItem:IterStart"] != "Iteration:IterateList" {
|
|
t.Fatalf("IterationItem parent = %q, want Iteration:IterateList", c.NodeParents["IterationItem:IterStart"])
|
|
}
|
|
if c.NodeParents["StringTransform:FmtItem"] != "Iteration:IterateList" {
|
|
t.Fatalf("FmtItem parent = %q, want Iteration:IterateList", c.NodeParents["StringTransform:FmtItem"])
|
|
}
|
|
if _, ok := c.NodeParents["Message:IterDone"]; ok {
|
|
t.Fatalf("outer follower Message:IterDone should not be marked as a grouped child")
|
|
}
|
|
}
|
|
|
|
// TestNewAgentServiceWithOptions_NilOptions pins that the new
|
|
// constructor accepts all-nil options and produces a usable
|
|
// service (no panic on field init, no nil-deref when running
|
|
// without Redis). Existing test sites rely on this — the
|
|
// zero-arg NewAgentService() now just delegates here.
|
|
func TestNewAgentServiceWithOptions_NilOptions(t *testing.T) {
|
|
t.Parallel()
|
|
svc := NewAgentServiceWithOptions(nil, nil, nil)
|
|
if svc == nil {
|
|
t.Fatal("NewAgentServiceWithOptions returned nil")
|
|
}
|
|
if svc.runner == nil {
|
|
t.Error("runner is nil after construction")
|
|
}
|
|
if svc.checkpointStore != nil {
|
|
t.Error("checkpointStore should be nil when constructed with nil")
|
|
}
|
|
if svc.stateSerializer != nil {
|
|
t.Error("stateSerializer should be nil when constructed with nil")
|
|
}
|
|
if svc.runTracker != nil {
|
|
t.Error("runTracker should be nil when constructed with nil")
|
|
}
|
|
if svc.canvasDAO == nil || svc.versionDAO == nil {
|
|
t.Error("DAOs should be non-nil after construction")
|
|
}
|
|
}
|
|
|
|
// TestNewAgentService_DefaultsToNilOptions pins that the legacy
|
|
// zero-arg NewAgentService() is functionally equivalent to
|
|
// NewAgentServiceWithOptions(nil, nil, nil) — the field values
|
|
// must match exactly so existing call sites don't observe a
|
|
// behavioural change.
|
|
func TestNewAgentService_DefaultsToNilOptions(t *testing.T) {
|
|
t.Parallel()
|
|
a := NewAgentService()
|
|
b := NewAgentServiceWithOptions(nil, nil, nil)
|
|
if a.checkpointStore != b.checkpointStore {
|
|
t.Errorf("checkpointStore mismatch: %v vs %v", a.checkpointStore, b.checkpointStore)
|
|
}
|
|
if a.stateSerializer != b.stateSerializer {
|
|
t.Errorf("stateSerializer mismatch: %v vs %v", a.stateSerializer, b.stateSerializer)
|
|
}
|
|
if a.runTracker != b.runTracker {
|
|
t.Errorf("runTracker mismatch: %v vs %v", a.runTracker, b.runTracker)
|
|
}
|
|
}
|
|
|
|
func TestNewAgentService_RegistersSandboxClient(t *testing.T) {
|
|
t.Parallel()
|
|
agenttool.SetSandboxClient(nil)
|
|
t.Cleanup(func() { agenttool.SetSandboxClient(nil) })
|
|
|
|
_ = NewAgentService()
|
|
|
|
if stub, ok := agenttool.GetSandboxClient().(interface{ IsStubSandboxClient() bool }); ok && stub.IsStubSandboxClient() {
|
|
t.Fatal("sandbox client remained stub after NewAgentService boot wiring")
|
|
}
|
|
}
|