mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-01 00:05:43 +08:00
## Summary
Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.
The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.
## Scope of alignment
### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.
### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.
### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.
### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.
### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.
## Out of scope (intentionally)
- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.
## How alignment is verified
`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle
`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).
`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.
```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```
## Design reference
`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).
---------
Co-authored-by: Claude <noreply@anthropic.com>
251 lines
7.7 KiB
Go
251 lines
7.7 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
// reset.go: DSL-level "reset" transform that mirrors the runtime
|
|
// behaviour of agent/canvas.py:Canvas.reset() in the Python backend.
|
|
//
|
|
// Python's Canvas.reset() does two things:
|
|
//
|
|
// 1. Graph.reset(): clears the per-component state (path, in-memory
|
|
// caches) and removes the per-task Redis log/cancel keys.
|
|
//
|
|
// 2. Per-run state wipe: empties self.history / retrieval / memory,
|
|
// then walks self.globals to zero out every "sys.*" key and to
|
|
// restore every "env.*" key from its declared default in
|
|
// self.variables.
|
|
//
|
|
// In the Go port there is no per-canvas "Graph" runtime — the
|
|
// executor is reconstructed from the DSL on every Run. So the
|
|
// Python "Graph.reset()" side (step 1) is implicitly handled by the
|
|
// per-run rebuild and the per-task Redis keys are still owned by the
|
|
// Python task executor. The Go port is responsible for the
|
|
// per-DSL-state wipe (step 2): it transforms the persisted DSL
|
|
// saved in user_canvas.dsl, the same way the Python handler does
|
|
// before writing it back via UserCanvasService.update_by_id.
|
|
//
|
|
// Frontend parity note: api/apps/restful_apis/agent_api.py:992
|
|
// (reset_agent) calls Canvas.reset() and returns the reset DSL in
|
|
// the response. The Go handler returns the same shape so existing
|
|
// frontends that call POST /api/v1/agents/:canvas_id/reset continue
|
|
// to receive the new DSL.
|
|
|
|
package dsl
|
|
|
|
// ResetForCanvas returns a defensive copy of dsl with all per-run
|
|
// state cleared, ready to be persisted back into user_canvas.dsl.
|
|
//
|
|
// The transform matches the Python Canvas.reset() semantics on the
|
|
// persisted DSL:
|
|
//
|
|
// - history, retrieval, memory, path → emptied
|
|
// - globals["sys.<name>"] → zeroed by type (string→"", number→0,
|
|
// bool→false, list→[], dict→{}, other→nil)
|
|
// - globals["env.<name>"] → restored from variables[name].value
|
|
// when present; otherwise zeroed by the variable's declared
|
|
// "type" (number→0, boolean→false, object→{}, array→[], else→"")
|
|
//
|
|
// Anything else in the DSL (graph, components, messages, ...)
|
|
// is left untouched, matching the Python implementation which
|
|
// only mutates history/retrieval/memory + globals.
|
|
func ResetForCanvas(dsl map[string]any) map[string]any {
|
|
if dsl == nil {
|
|
return map[string]any{}
|
|
}
|
|
out := copyMapStringAny(dsl)
|
|
|
|
// Per-run accumulators. The Python implementation assigns fresh
|
|
// empty lists to each; we mirror that by replacing whatever is
|
|
// stored under these keys with a fresh slice. Using a fresh slice
|
|
// (not a shared nil sentinel) matches the Python [] list literal
|
|
// in __str__ / reset.
|
|
out["history"] = []any{}
|
|
out["retrieval"] = []any{}
|
|
out["memory"] = []any{}
|
|
out["path"] = []any{}
|
|
|
|
// Snapshot variables (env.* defaults) so the env.* reset loop
|
|
// below is stable even when globals is otherwise empty.
|
|
// Deep-copy both maps — the reset loop mutates `globals` in
|
|
// place, and the service layer feeds the same DSL back into
|
|
// the response body after persistence. A shallow copy would
|
|
// leak the wipe back into the caller's view of the row.
|
|
vars, _ := out["variables"].(map[string]any)
|
|
if vars == nil {
|
|
vars = map[string]any{}
|
|
}
|
|
vars = deepCopyMap(vars)
|
|
|
|
globals, _ := out["globals"].(map[string]any)
|
|
if globals == nil {
|
|
// An empty / missing globals map is valid: Python's reset
|
|
// iterates self.globals.keys() and is a no-op when empty,
|
|
// leaving globals as the (possibly empty) dict it was. We
|
|
// preserve that shape instead of inserting a nil.
|
|
return out
|
|
}
|
|
globals = deepCopyMap(globals)
|
|
// Stash the (deep-copied) globals back into out so the
|
|
// returned DSL reflects every change the reset loop makes.
|
|
out["globals"] = globals
|
|
|
|
// Reset in place on the snapshot. Go map iteration order is
|
|
// non-deterministic, so collect the sys./env. keys first and
|
|
// then mutate the map to avoid any "read+write during
|
|
// iteration" gotcha.
|
|
sysKeys := make([]string, 0)
|
|
envKeys := make([]string, 0)
|
|
for k := range globals {
|
|
switch {
|
|
case len(k) > 4 && k[:4] == "sys.":
|
|
sysKeys = append(sysKeys, k)
|
|
case len(k) > 4 && k[:4] == "env.":
|
|
envKeys = append(envKeys, k)
|
|
}
|
|
}
|
|
|
|
for _, k := range sysKeys {
|
|
globals[k] = zeroByType(globals[k])
|
|
}
|
|
for _, k := range envKeys {
|
|
name := k[4:]
|
|
v, ok := vars[name].(map[string]any)
|
|
if !ok {
|
|
// No declared default → empty string, matching the
|
|
// Python `else: self.globals[k] = ""` branch when
|
|
// the variable entry is missing entirely.
|
|
globals[k] = ""
|
|
continue
|
|
}
|
|
if value, present := v["value"]; present && value != nil {
|
|
globals[k] = value
|
|
continue
|
|
}
|
|
globals[k] = zeroByVariableType(v)
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
// zeroByType returns the type-appropriate "empty" value for v,
|
|
// matching the Python reset() branch for sys.* keys:
|
|
//
|
|
// string -> ""
|
|
// int -> 0
|
|
// float -> 0
|
|
// list -> []
|
|
// dict -> {}
|
|
// other -> nil
|
|
//
|
|
// The list / dict branches return a fresh empty container, not a
|
|
// shared nil — consistent with the Python literal `[]` / `{}`.
|
|
// Primitives (string, int, float) are returned as fresh zero
|
|
// values; this is fine because the caller is going to overwrite
|
|
// the map entry with the return value anyway.
|
|
func zeroByType(v any) any {
|
|
switch v.(type) {
|
|
case string:
|
|
return ""
|
|
case bool:
|
|
return false
|
|
case int:
|
|
return 0
|
|
case int32:
|
|
return int32(0)
|
|
case int64:
|
|
return int64(0)
|
|
case float32:
|
|
return float32(0)
|
|
case float64:
|
|
return float64(0)
|
|
case []any:
|
|
return []any{}
|
|
case map[string]any:
|
|
return map[string]any{}
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// zeroByVariableType mirrors the Python `else` branch that runs
|
|
// when an env.* variable is declared but has no `value` field.
|
|
// The Python source keys on the declared "type" string:
|
|
//
|
|
// "number" -> 0
|
|
// "boolean" -> False
|
|
// "object" -> {}
|
|
// "array*" -> []
|
|
// else -> "" (covers "string" and unknown)
|
|
func zeroByVariableType(v map[string]any) any {
|
|
t, _ := v["type"].(string)
|
|
switch t {
|
|
case "number":
|
|
return 0
|
|
case "boolean":
|
|
return false
|
|
case "object":
|
|
return map[string]any{}
|
|
}
|
|
if len(t) >= 5 && t[:5] == "array" {
|
|
return []any{}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// deepCopyMap returns a fresh map with the same keys, recursively
|
|
// copying nested map / slice values. Primitives are shared by
|
|
// reference (they are immutable in Go). This is a focused helper
|
|
// for the reset path: in practice globals is a flat
|
|
// string→primitive map and variables is a flat
|
|
// string→{type, value} map, so a full deep walk is overkill, but
|
|
// the cost is negligible and it eliminates a class of
|
|
// "the caller's map got mutated" bugs the shallow `copyMapStringAny`
|
|
// helper would let through.
|
|
func deepCopyMap(m map[string]any) map[string]any {
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
out := make(map[string]any, len(m))
|
|
for k, v := range m {
|
|
switch x := v.(type) {
|
|
case map[string]any:
|
|
out[k] = deepCopyMap(x)
|
|
case []any:
|
|
out[k] = deepCopySlice(x)
|
|
default:
|
|
out[k] = v
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func deepCopySlice(s []any) []any {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
out := make([]any, len(s))
|
|
for i, v := range s {
|
|
switch x := v.(type) {
|
|
case map[string]any:
|
|
out[i] = deepCopyMap(x)
|
|
case []any:
|
|
out[i] = deepCopySlice(x)
|
|
default:
|
|
out[i] = x
|
|
}
|
|
}
|
|
return out
|
|
}
|