mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
## Summary
Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.
The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.
## Scope of alignment
### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.
### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.
### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.
### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.
### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.
## Out of scope (intentionally)
- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.
## How alignment is verified
`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle
`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).
`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.
```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```
## Design reference
`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).
---------
Co-authored-by: Claude <noreply@anthropic.com>
298 lines
10 KiB
Go
298 lines
10 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
package tool
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/cloudwego/eino/components/tool"
|
|
"github.com/cloudwego/eino/schema"
|
|
)
|
|
|
|
// ErrCodeExecSandboxMissing is returned when no sandbox client is
|
|
// registered. The Python sandbox itself is kept as-is (the Go
|
|
// side never reimplemented the sandbox). When a client is
|
|
// registered via SetSandboxClient at boot, the tool dispatches
|
|
// the execution.
|
|
var ErrCodeExecSandboxMissing = errors.New(
|
|
"CodeExec sandbox client not registered — call SetSandboxClient at boot",
|
|
)
|
|
|
|
const codeExecToolName = "execute_code"
|
|
|
|
const codeExecToolDescription = "This tool has a sandbox that can execute code written in 'Python'/'Javascript'. " +
|
|
"It receives a piece of code and returns a JSON string."
|
|
|
|
// codeExecArgs is the JSON shape the model sends in. The Python
|
|
// tool accepts "lang" + "script"; we also accept "code" as a
|
|
// synonym since some DSLs and tests use that spelling.
|
|
type codeExecArgs struct {
|
|
Language string `json:"language,omitempty"`
|
|
Lang string `json:"lang,omitempty"`
|
|
Script string `json:"script,omitempty"`
|
|
Code string `json:"code,omitempty"`
|
|
Args map[string]any `json:"arguments,omitempty"`
|
|
// Timeout is the per-execution wall-clock budget in seconds. 0
|
|
// (the default) defers to the sandbox provider's own default
|
|
// (typically 30s). Mirrors Python's
|
|
// `code_exec.py:358 timeout_seconds = int(os.environ.get(...))`
|
|
// but the value flows per-call rather than per-process, which
|
|
// lets the model dial up/down for known-fast vs. known-slow
|
|
// scripts.
|
|
Timeout int `json:"timeout,omitempty"`
|
|
}
|
|
|
|
// codeExecResult is the JSON envelope returned to the model. The output
|
|
// shape mirrors the Python tool's `content` / `_ERROR` / `actual_type`
|
|
// fields so downstream nodes can pattern-match unchanged. Artifacts and
|
|
// Attachments are surfaced for the model and downstream component
|
|
// consumption (e.g. Message component's artifact markdown formatter).
|
|
type codeExecResult struct {
|
|
Content string `json:"content,omitempty"`
|
|
ActualType string `json:"actual_type,omitempty"`
|
|
RawResult any `json:"raw_result,omitempty"`
|
|
Stub bool `json:"stub,omitempty"`
|
|
Error string `json:"_ERROR,omitempty"`
|
|
ExitCode int `json:"exit_code,omitempty"`
|
|
Stdout string `json:"stdout,omitempty"`
|
|
Stderr string `json:"stderr,omitempty"`
|
|
Artifacts []map[string]any `json:"_ARTIFACTS,omitempty"`
|
|
Attachments []map[string]any `json:"attachments,omitempty"`
|
|
}
|
|
|
|
// CodeExecTool is the for the CodeExec tool
|
|
// ( . It validates language +
|
|
// non-empty code and returns a structured "not-yet-wired" error.
|
|
type CodeExecTool struct{}
|
|
|
|
// NewCodeExecTool returns a CodeExecTool implementing eino's
|
|
// tool.InvokableTool interface.
|
|
func NewCodeExecTool() *CodeExecTool {
|
|
return &CodeExecTool{}
|
|
}
|
|
|
|
// Info returns the tool's metadata for the chat model. The schema mirrors
|
|
// the Python CodeExecParam ToolMeta (plan , 字段对齐).
|
|
func (c *CodeExecTool) Info(_ context.Context) (*schema.ToolInfo, error) {
|
|
return &schema.ToolInfo{
|
|
Name: codeExecToolName,
|
|
Desc: codeExecToolDescription,
|
|
ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
|
|
"language": {
|
|
Type: schema.String,
|
|
Desc: "The programming language of the code. Allowed: 'python' (or 'python3'), 'javascript' (or 'nodejs').",
|
|
Enum: []string{"python", "python3", "javascript", "nodejs"},
|
|
Required: true,
|
|
},
|
|
"code": {
|
|
Type: schema.String,
|
|
Desc: "The code to execute. Must define a `main` function (Python) or export `main` (JavaScript).",
|
|
Required: true,
|
|
},
|
|
}),
|
|
}, nil
|
|
}
|
|
|
|
// InvokableRun validates the inputs and dispatches to the
|
|
// registered sandbox client via SetSandboxClient.
|
|
func (c *CodeExecTool) InvokableRun(ctx context.Context, argumentsInJSON string, _ ...tool.Option) (string, error) {
|
|
var args codeExecArgs
|
|
if argumentsInJSON == "" {
|
|
return codeExecStubResult("arguments are required"), errors.New("code_exec: empty arguments")
|
|
}
|
|
if err := json.Unmarshal([]byte(argumentsInJSON), &args); err != nil {
|
|
return codeExecStubResult("invalid JSON: " + err.Error()),
|
|
fmt.Errorf("code_exec: parse arguments: %w", err)
|
|
}
|
|
|
|
lang := normalizeCodeExecLang(args.Language, args.Lang)
|
|
if lang == "" {
|
|
return codeExecStubResult("unsupported language: must be 'python'/'python3'/'javascript'/'nodejs'"),
|
|
errors.New("code_exec: invalid language")
|
|
}
|
|
|
|
script := args.Script
|
|
if script == "" {
|
|
script = args.Code
|
|
}
|
|
if strings.TrimSpace(script) == "" {
|
|
return codeExecStubResult("code is required"), errors.New("code_exec: empty code")
|
|
}
|
|
|
|
// Dispatch to the registered SandboxClient. When the default
|
|
// stub is in place, the call surfaces
|
|
// ErrCodeExecSandboxMissing; once a real client is
|
|
// installed via SetSandboxClient at boot, the script runs.
|
|
client := GetSandboxClient()
|
|
req := SandboxRequest{
|
|
Lang: lang,
|
|
Script: script,
|
|
Arguments: args.Args,
|
|
Timeout: args.Timeout,
|
|
}
|
|
log.Printf("DEBUG CodeExec tool invoke: lang=%q timeout=%d arguments=%#v script=%q", req.Lang, req.Timeout, req.Arguments, req.Script)
|
|
resp, err := client.ExecuteCode(ctx, req)
|
|
if err != nil {
|
|
return codeExecStubResult(err.Error()), err
|
|
}
|
|
out, mErr := codeExecResultJSON(resp)
|
|
if mErr != nil {
|
|
return codeExecStubResult(mErr.Error()), mErr
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// codeExecResultJSON serializes a SandboxResponse into the envelope
|
|
// the eino tool contract returns. Field mapping mirrors the Python
|
|
// tool's `code_exec.py:385-490` `_process_execution_result`:
|
|
//
|
|
// - Stdout / Stderr / ExitCode: stream directly through.
|
|
// - Returned → Content (the model's natural "what did main() give
|
|
// us back" field).
|
|
// - StructuredResult["actual_type"] → ActualType (Python
|
|
// `infer_actual_type` surface for downstream Message component).
|
|
// - Metadata["artifacts"] → Artifacts (the model AND the Message
|
|
// component's `_ARTIFACTS` collector both consume this; we
|
|
// surface it as `_ARTIFACTS` to match the Python envelope).
|
|
// - Metadata["attachments"] → Attachments (rendered into
|
|
// downstream Markdown by Message via the same path the Agent
|
|
// tool artifact markdown uses).
|
|
//
|
|
// Artifacts / Attachments with the wrong element type (anything
|
|
// other than map[string]any) are silently dropped with a log
|
|
// warning. This matches the Python tool's "skip on shape mismatch"
|
|
// semantics — better to lose one artifact than to abort the run.
|
|
func codeExecResultJSON(r *SandboxResponse) (string, error) {
|
|
if r == nil {
|
|
return codeExecStubResult("empty response"), nil
|
|
}
|
|
out := codeExecResult{
|
|
ExitCode: r.ExitCode,
|
|
Stdout: r.Stdout,
|
|
Stderr: r.Stderr,
|
|
}
|
|
if r.Metadata != nil {
|
|
out.Artifacts = extractArtifactList(r.Metadata, "artifacts")
|
|
out.Attachments = extractArtifactList(r.Metadata, "attachments")
|
|
}
|
|
hasStructuredResult := false
|
|
resolvedValue, usedStdoutFallback := resolveCodeExecResultValue(r)
|
|
if r.StructuredResult != nil {
|
|
hasStructuredResult, _ = r.StructuredResult["present"].(bool)
|
|
}
|
|
if strings.TrimSpace(r.Stderr) != "" &&
|
|
!hasStructuredResult &&
|
|
len(out.Artifacts) == 0 &&
|
|
strings.TrimSpace(r.Stdout) == "" {
|
|
out.Error = r.Stderr
|
|
} else {
|
|
if usedStdoutFallback && strings.TrimSpace(r.Stdout) != "" {
|
|
fmt.Fprintln(os.Stderr, "code_exec: falling back to stdout deserialization because no structured result metadata was provided")
|
|
}
|
|
out.RawResult = NormalizeCodeExecOutputValue(resolvedValue)
|
|
out.ActualType = InferCodeExecActualType(out.RawResult)
|
|
out.Content = RenderCodeExecCanonicalContent(out.RawResult)
|
|
}
|
|
log.Printf("DEBUG CodeExec tool: structured_result=%#v resolved_value=%#v raw_result=%#v content=%q actual_type=%q stderr=%q stdout=%q",
|
|
r.StructuredResult, resolvedValue, out.RawResult, out.Content, out.ActualType, r.Stderr, r.Stdout)
|
|
b, err := json.Marshal(out)
|
|
if err != nil {
|
|
return "", fmt.Errorf("code_exec: marshal result: %w", err)
|
|
}
|
|
return string(b), nil
|
|
}
|
|
|
|
// extractArtifactList pulls a list of dict-shaped entries out of
|
|
// Metadata[key]. Items that aren't map[string]any are dropped with
|
|
// a stderr log line so the operator can see the data loss without
|
|
// the run aborting.
|
|
func extractArtifactList(meta map[string]any, key string) []map[string]any {
|
|
raw, ok := meta[key]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
arr, ok := raw.([]any)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
out := make([]map[string]any, 0, len(arr))
|
|
for i, item := range arr {
|
|
m, ok := item.(map[string]any)
|
|
if !ok {
|
|
fmt.Fprintf(os.Stderr, "code_exec: %s[%d] is %T, expected map[string]any; dropping\n", key, i, item)
|
|
continue
|
|
}
|
|
out = append(out, m)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func codeExecStubResult(msg string) string {
|
|
b, err := json.Marshal(codeExecResult{
|
|
Stub: true,
|
|
Error: msg,
|
|
})
|
|
if err != nil {
|
|
return fmt.Sprintf(`{"_ERROR":"code_exec: marshal stub: %s","stub":true}`, err)
|
|
}
|
|
return string(b)
|
|
}
|
|
|
|
func resolveCodeExecResultValue(r *SandboxResponse) (any, bool) {
|
|
if r != nil && r.StructuredResult != nil {
|
|
if present, _ := r.StructuredResult["present"].(bool); present {
|
|
return r.StructuredResult["value"], false
|
|
}
|
|
}
|
|
return deserializeCodeExecStdout(r.Stdout), true
|
|
}
|
|
|
|
func deserializeCodeExecStdout(stdout string) any {
|
|
text := strings.TrimSpace(stdout)
|
|
if text == "" {
|
|
return ""
|
|
}
|
|
var decoded any
|
|
if err := json.Unmarshal([]byte(text), &decoded); err == nil {
|
|
return decoded
|
|
}
|
|
return text
|
|
}
|
|
|
|
// normalizeCodeExecLang accepts the model's literal "language" or the
|
|
// Python-style "lang" alias and maps synonyms to the canonical "python" /
|
|
// "nodejs" forms used by the Python sandbox.
|
|
func normalizeCodeExecLang(primary, alias string) string {
|
|
v := strings.ToLower(strings.TrimSpace(primary))
|
|
if v == "" {
|
|
v = strings.ToLower(strings.TrimSpace(alias))
|
|
}
|
|
switch v {
|
|
case "python", "python3":
|
|
return "python"
|
|
case "javascript", "js", "nodejs", "node":
|
|
return "nodejs"
|
|
}
|
|
return ""
|
|
}
|