mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
## Summary
Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.
The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.
## Scope of alignment
### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.
### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.
### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.
### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.
### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.
## Out of scope (intentionally)
- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.
## How alignment is verified
`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle
`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).
`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.
```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```
## Design reference
`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).
---------
Co-authored-by: Claude <noreply@anthropic.com>
336 lines
10 KiB
Go
336 lines
10 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
package tool
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"strings"
|
|
"testing"
|
|
)
|
|
|
|
func TestCodeExec_StubsErrorWhenClientMissing(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c := NewCodeExecTool()
|
|
out, err := c.InvokableRun(context.Background(), `{"language":"python","code":"def main(): return {}"}`)
|
|
if !errors.Is(err, ErrCodeExecSandboxMissing) {
|
|
t.Fatalf("err = %v, want ErrCodeExecSandboxMissing", err)
|
|
}
|
|
|
|
var got codeExecResult
|
|
if jerr := json.Unmarshal([]byte(out), &got); jerr != nil {
|
|
t.Fatalf("output is not valid JSON: %v (raw=%s)", jerr, out)
|
|
}
|
|
if !got.Stub {
|
|
t.Errorf("Stub = false, want true")
|
|
}
|
|
if !strings.Contains(got.Error, "sandbox") {
|
|
t.Errorf("Error = %q, want to mention 'sandbox'", got.Error)
|
|
}
|
|
}
|
|
|
|
func TestCodeExec_RejectsEmptyCode(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c := NewCodeExecTool()
|
|
_, err := c.InvokableRun(context.Background(), `{"language":"python","code":""}`)
|
|
if err == nil || !strings.Contains(err.Error(), "code") {
|
|
t.Fatalf("err = %v, want to mention empty code", err)
|
|
}
|
|
}
|
|
|
|
func TestCodeExec_RejectsBadLanguage(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c := NewCodeExecTool()
|
|
_, err := c.InvokableRun(context.Background(), `{"language":"brainfuck","code":"x"}`)
|
|
if err == nil || !strings.Contains(err.Error(), "language") {
|
|
t.Fatalf("err = %v, want to reject unsupported language", err)
|
|
}
|
|
}
|
|
|
|
func TestCodeExec_AcceptsLangAlias(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c := NewCodeExecTool()
|
|
// Python tool also accepts "lang" as the field name; the Go shell
|
|
// should still reach the stub branch.
|
|
_, err := c.InvokableRun(context.Background(), `{"lang":"nodejs","script":"async function main() {}"}`)
|
|
if !errors.Is(err, ErrCodeExecSandboxMissing) {
|
|
t.Fatalf("err = %v, want ErrCodeExecSandboxMissing", err)
|
|
}
|
|
}
|
|
|
|
func TestCodeExec_Info(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c := NewCodeExecTool()
|
|
info, err := c.Info(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("Info: %v", err)
|
|
}
|
|
if info.Name != "execute_code" {
|
|
t.Errorf("Name = %q, want execute_code", info.Name)
|
|
}
|
|
if !strings.Contains(info.Desc, "Python") {
|
|
t.Errorf("Desc = %q, want to mention Python", info.Desc)
|
|
}
|
|
}
|
|
|
|
// TestCodeExec_ResultExtractsArtifacts pins the artifact
|
|
// collection: SandboxResponse.Metadata["artifacts"] must be
|
|
// surfaced as `_ARTIFACTS` in the tool's JSON envelope so the
|
|
// Message
|
|
// component's artifact markdown formatter can render them.
|
|
func TestCodeExec_ResultExtractsArtifacts(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
resp := &SandboxResponse{
|
|
Returned: "ok",
|
|
ExitCode: 0,
|
|
Metadata: map[string]any{
|
|
"artifacts": []any{
|
|
map[string]any{"name": "chart.png", "url": "minio://b/chart.png"},
|
|
map[string]any{"name": "data.csv", "url": "minio://b/data.csv"},
|
|
},
|
|
},
|
|
}
|
|
out, err := codeExecResultJSON(resp)
|
|
if err != nil {
|
|
t.Fatalf("codeExecResultJSON: %v", err)
|
|
}
|
|
var got codeExecResult
|
|
if jerr := json.Unmarshal([]byte(out), &got); jerr != nil {
|
|
t.Fatalf("output not valid JSON: %v (raw=%s)", jerr, out)
|
|
}
|
|
if len(got.Artifacts) != 2 {
|
|
t.Fatalf("Artifacts len = %d, want 2", len(got.Artifacts))
|
|
}
|
|
if got.Artifacts[0]["name"] != "chart.png" {
|
|
t.Errorf("Artifacts[0][name] = %v, want chart.png", got.Artifacts[0]["name"])
|
|
}
|
|
}
|
|
|
|
// TestCodeExec_ResultDropsBadArtifactShape ensures the extractor
|
|
// silently drops entries that aren't map[string]any rather than
|
|
// aborting the run.
|
|
func TestCodeExec_ResultDropsBadArtifactShape(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
resp := &SandboxResponse{
|
|
Returned: "ok",
|
|
Metadata: map[string]any{
|
|
"artifacts": []any{
|
|
"just a string", // bad shape
|
|
map[string]any{"name": "ok.png"}, // good
|
|
42, // bad shape
|
|
},
|
|
},
|
|
}
|
|
out, err := codeExecResultJSON(resp)
|
|
if err != nil {
|
|
t.Fatalf("codeExecResultJSON: %v", err)
|
|
}
|
|
var got codeExecResult
|
|
if jerr := json.Unmarshal([]byte(out), &got); jerr != nil {
|
|
t.Fatalf("output not valid JSON: %v", jerr)
|
|
}
|
|
if len(got.Artifacts) != 1 {
|
|
t.Errorf("Artifacts len = %d, want 1 (bad shapes dropped)", len(got.Artifacts))
|
|
}
|
|
if got.Artifacts[0]["name"] != "ok.png" {
|
|
t.Errorf("Artifacts[0][name] = %v, want ok.png", got.Artifacts[0]["name"])
|
|
}
|
|
}
|
|
|
|
// TestCodeExec_ResultExtractsAttachments pins the attachments
|
|
// (rendered to downstream Message markdown) path. Distinct from
|
|
// artifacts so renderers can route them differently.
|
|
func TestCodeExec_ResultExtractsAttachments(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
resp := &SandboxResponse{
|
|
Returned: "ok",
|
|
Metadata: map[string]any{
|
|
"attachments": []any{
|
|
map[string]any{"name": "report.pdf", "url": "minio://b/report.pdf"},
|
|
},
|
|
},
|
|
}
|
|
out, err := codeExecResultJSON(resp)
|
|
if err != nil {
|
|
t.Fatalf("codeExecResultJSON: %v", err)
|
|
}
|
|
var got codeExecResult
|
|
if jerr := json.Unmarshal([]byte(out), &got); jerr != nil {
|
|
t.Fatalf("output not valid JSON: %v", jerr)
|
|
}
|
|
if len(got.Attachments) != 1 {
|
|
t.Fatalf("Attachments len = %d, want 1", len(got.Attachments))
|
|
}
|
|
}
|
|
|
|
// TestCodeExec_ResultSurfacesActualType pins the actual_type
|
|
// surface used by Message component to render the right Markdown
|
|
// formatting (Number → <code>, Object → JSON dump, etc.).
|
|
func TestCodeExec_ResultSurfacesActualType(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
resp := &SandboxResponse{
|
|
StructuredResult: map[string]any{
|
|
"present": true,
|
|
"value": map[string]any{
|
|
"x": float64(1),
|
|
},
|
|
},
|
|
}
|
|
out, err := codeExecResultJSON(resp)
|
|
if err != nil {
|
|
t.Fatalf("codeExecResultJSON: %v", err)
|
|
}
|
|
var got codeExecResult
|
|
if jerr := json.Unmarshal([]byte(out), &got); jerr != nil {
|
|
t.Fatalf("output not valid JSON: %v", jerr)
|
|
}
|
|
if got.ActualType != "Object" {
|
|
t.Errorf("ActualType = %q, want Object", got.ActualType)
|
|
}
|
|
if got.Content != "{\n \"x\": 1\n}" {
|
|
t.Errorf("Content = %q, want pretty JSON object", got.Content)
|
|
}
|
|
}
|
|
|
|
func TestCodeExec_ResultUsesStructuredResultValue(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
resp := &SandboxResponse{
|
|
Returned: "8",
|
|
StructuredResult: map[string]any{
|
|
"present": true,
|
|
"value": float64(8),
|
|
},
|
|
}
|
|
out, err := codeExecResultJSON(resp)
|
|
if err != nil {
|
|
t.Fatalf("codeExecResultJSON: %v", err)
|
|
}
|
|
var got map[string]any
|
|
if jerr := json.Unmarshal([]byte(out), &got); jerr != nil {
|
|
t.Fatalf("output not valid JSON: %v", jerr)
|
|
}
|
|
if got["raw_result"] != float64(8) {
|
|
t.Fatalf("raw_result = %#v, want 8", got["raw_result"])
|
|
}
|
|
if got["content"] != "8" {
|
|
t.Fatalf("content = %#v, want \"8\"", got["content"])
|
|
}
|
|
if got["actual_type"] != "Number" {
|
|
t.Fatalf("actual_type = %#v, want Number", got["actual_type"])
|
|
}
|
|
}
|
|
|
|
func TestCodeExec_ResultFallsBackToStdoutJSON(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
resp := &SandboxResponse{
|
|
Stdout: `{"a":[1,2]}`,
|
|
}
|
|
out, err := codeExecResultJSON(resp)
|
|
if err != nil {
|
|
t.Fatalf("codeExecResultJSON: %v", err)
|
|
}
|
|
var got map[string]any
|
|
if jerr := json.Unmarshal([]byte(out), &got); jerr != nil {
|
|
t.Fatalf("output not valid JSON: %v", jerr)
|
|
}
|
|
raw, ok := got["raw_result"].(map[string]any)
|
|
if !ok {
|
|
t.Fatalf("raw_result type = %T, want map[string]any", got["raw_result"])
|
|
}
|
|
arr, ok := raw["a"].([]any)
|
|
if !ok || len(arr) != 2 || arr[0] != float64(1) || arr[1] != float64(2) {
|
|
t.Fatalf("raw_result[a] = %#v, want [1 2]", raw["a"])
|
|
}
|
|
if got["actual_type"] != "Object" {
|
|
t.Fatalf("actual_type = %#v, want Object", got["actual_type"])
|
|
}
|
|
if got["content"] != "{\n \"a\": [\n 1,\n 2\n ]\n}" {
|
|
t.Fatalf("content = %#v, want pretty JSON", got["content"])
|
|
}
|
|
}
|
|
|
|
// TestCodeExec_PassesTimeoutToSandbox verifies the new
|
|
// `timeout` arg flows into the SandboxRequest.Timeout field so
|
|
// the model can dial per-script budgets. Note: this test
|
|
// mutates the global sandbox client; it must NOT run in
|
|
// parallel with the other CodeExec tests that depend on the
|
|
// default (loud-fail) stub.
|
|
func TestCodeExec_PassesTimeoutToSandbox(t *testing.T) {
|
|
var captured SandboxRequest
|
|
prev := GetSandboxClient()
|
|
SetSandboxClient(stubSandbox(func(_ context.Context, req SandboxRequest) (*SandboxResponse, error) {
|
|
captured = req
|
|
return &SandboxResponse{Returned: "ok", ExitCode: 0}, nil
|
|
}))
|
|
t.Cleanup(func() { SetSandboxClient(prev) })
|
|
|
|
c := NewCodeExecTool()
|
|
_, err := c.InvokableRun(context.Background(),
|
|
`{"language":"python","code":"def main(): return {}","timeout":42}`)
|
|
if err != nil {
|
|
t.Fatalf("InvokableRun: %v", err)
|
|
}
|
|
if captured.Timeout != 42 {
|
|
t.Errorf("SandboxRequest.Timeout = %d, want 42", captured.Timeout)
|
|
}
|
|
}
|
|
|
|
// TestCodeExec_PassesArgumentsToSandbox verifies the `arguments`
|
|
// arg (Python `**kwargs` to main()) is propagated. Like the
|
|
// timeout test, this mutates the global sandbox client and must
|
|
// not run in parallel with sibling CodeExec tests.
|
|
func TestCodeExec_PassesArgumentsToSandbox(t *testing.T) {
|
|
var captured SandboxRequest
|
|
prev := GetSandboxClient()
|
|
SetSandboxClient(stubSandbox(func(_ context.Context, req SandboxRequest) (*SandboxResponse, error) {
|
|
captured = req
|
|
return &SandboxResponse{Returned: "ok", ExitCode: 0}, nil
|
|
}))
|
|
t.Cleanup(func() { SetSandboxClient(prev) })
|
|
|
|
c := NewCodeExecTool()
|
|
_, err := c.InvokableRun(context.Background(),
|
|
`{"language":"python","code":"def main(**kw): return kw","arguments":{"x":1,"y":"z"}}`)
|
|
if err != nil {
|
|
t.Fatalf("InvokableRun: %v", err)
|
|
}
|
|
if captured.Arguments["x"].(float64) != 1 || captured.Arguments["y"].(string) != "z" {
|
|
t.Errorf("Arguments = %v, want {x:1, y:z}", captured.Arguments)
|
|
}
|
|
}
|
|
|
|
// stubSandbox adapts a function literal to the SandboxClient
|
|
// interface so the timeout / arguments tests can capture the
|
|
// request without depending on the default stub.
|
|
type stubSandbox func(ctx context.Context, req SandboxRequest) (*SandboxResponse, error)
|
|
|
|
func (s stubSandbox) ExecuteCode(ctx context.Context, req SandboxRequest) (*SandboxResponse, error) {
|
|
return s(ctx, req)
|
|
}
|