Files
ragflow/internal/agent/tool/code_exec_contract.go

307 lines
7.6 KiB
Go
Raw Permalink Normal View History

feat(agent): align Go agent behavior with Python (except retrieval component) (#16225) ## Summary Aligns the **Go agent runtime/canvas/components/tools** behavior with the **Python `agent/` implementation** so the same stored canvas DSL produces the same execution result on either side. Every component, tool, and runtime primitive in `internal/agent/` is now driven by the same semantics as its Python counterpart — variable resolution, template substitution, control flow, error reporting, retry/cancel, and stream event shapes. The **retrieval component is the one explicit exception** in this PR. It is being reworked in a separate change and is excluded from this alignment pass; the wrapper slot (`universe_a_wrappers.go → newRetrievalComponent`) is preserved. ## Scope of alignment ### Components (all aligned with `agent/component/`) `Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs, MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure, JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent` (react + tool artifact capture + `Reset()` interface-assert) · `Switch` (12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke` · `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`) · `UserFillUp` (Python-equivalent interrupt/resume via eino `compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` · `ListOperations` · `StringTransform` · `VariableAggregator` · `VariableAssigner` · `Browser` (full stagehand runtime parity) · `DocsGenerator` · `ExcelProcessor`. ### Tools (all aligned with `agent/tools/`) `Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter` (streamable-HTTP) · `CodeExec` (sandbox bridge with `code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv` · `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` · `Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG` · `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` — uniform `eino tool.InvokableTool` interface, SSRF protection, shared HTTP client. ### Canvas execution engine (`internal/agent/canvas/`) Aligned with Python's `agent/canvas.py`: - **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas, per-component timeout resolver (4-level: per-class env → per-class table → uniform env → 600s fallback), `legacyNoOpNames`. - **Loop subgraph** (`loop_subgraph.go`): Python-equivalent `AddLoopNode` macro expansion + condition translation. - **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing via `compose.NewGraphMultiBranch` — same branch selection semantics as Python. - **Parallel subgraph** (`parallel_subgraph.go`): matches Python's parallel fan-out contract. - **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` / `IsInterruptError` / `ExtractInterruptContexts` — replaces the deprecated Python sentinel chain with eino's native interrupt API, preserving the same external behavior. - **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore` Get/Set/Delete, with business metadata (status / canvas_id / parent_run_id) on a parallel Redis Hash. - **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed / MarkCancelled / AttachCheckpoint — same lifecycle as the Python run record. - **Cancel** (`cancel.go`): Redis pub/sub watch. - **Stream** (`stream.go`): SSE channel with `messages` / `waiting` / `errors` / `done` events, same shape as Python's `agent.canvas.RunEvent` payload. ### DSL bridge (`internal/agent/dsl/`) - `normalize.go`: v1↔v2 collapsed into a single wire format — Python and Go consume the same stored JSON. - `reset.go`: per-run state reset matches Python's `Canvas.reset()` semantics. - Testdata mirrors Python's `agent_msg.json` / `all.json` / etc. ### Runtime (`internal/agent/runtime/`) - `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`: same `{{cpn_id@param}}` resolution model. - `ResolveTemplate` (regex fast path + gonja fallback) — Python Jinja-style semantics. - `selector.go`, `metrics.go`, `component.go`: shared runtime contracts. ## Out of scope (intentionally) - **`Retrieval` component logic** — wrapped only; full parity lands in a follow-up PR. - **Frontend** — only minor dsl-bridge / canvas UX fixes ride along. - **CLI / admin / model registry** — orthogonal to agent behavior. ## How alignment is verified `internal/service/agent_run_e2e_test.go` exercises the **full production chain** against real Python-shaped DSL fixtures: ``` loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL → canvas.Compile → cc.Workflow.Invoke → answer extraction ``` using in-memory SQLite + miniredis (no Docker). Covers: - `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}` resolution - `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle (Python-equivalent) - `TestRunAgent_RealCanvas_CompileFails` — unknown component name → sanitized error (Python-equivalent) - `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref (Python-equivalent) - `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` — Start→AttachCheckpoint→MarkSucceeded lifecycle `internal/handler/agent_test.go` — SSE streaming parity (`Content-Type: text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`, OpenAI-compatible non-stream `choices`). `internal/agent/canvas/fixture_compile_test.go` + per-component tests pin the Python-equivalent outputs. ``` go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/ ``` ## Design reference `docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked 2026-06-17) — module layout, per-component / per-tool inventory, corner-case catalogue, and the actionable backlog (Section 14, including the retrieval alignment follow-up). --------- Co-authored-by: Claude <noreply@anthropic.com>
2026-06-22 11:58:29 +08:00
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package tool
import (
"encoding/json"
"fmt"
"math"
"strings"
)
var codeExecSystemOutputKeys = map[string]struct{}{
"content": {},
"actual_type": {},
"attachments": {},
"_ERROR": {},
"_ARTIFACTS": {},
"_ATTACHMENT_CONTENT": {},
"raw_result": {},
"_created_time": {},
"_elapsed_time": {},
}
type CodeExecContract struct {
BusinessOutput string
Value any
ActualType string
Content string
}
func BuildCodeExecContract(outputs map[string]any, rawResult any) (*CodeExecContract, error) {
businessName, businessMeta, err := selectCodeExecBusinessOutput(outputs)
if err != nil {
return nil, err
}
normalizedValue := NormalizeCodeExecOutputValue(rawResult)
if err := validateCodeExecTopLevelValueDomain(normalizedValue); err != nil {
return nil, err
}
expectedType := ""
if meta, ok := businessMeta.(map[string]any); ok {
expectedType = strings.TrimSpace(fmt.Sprint(meta["type"]))
}
if err := validateCodeExecExpectedType(expectedType, normalizedValue, ""); err != nil {
return nil, err
}
return &CodeExecContract{
BusinessOutput: businessName,
Value: normalizedValue,
ActualType: InferCodeExecActualType(normalizedValue),
Content: RenderCodeExecCanonicalContent(normalizedValue),
}, nil
}
func NormalizeCodeExecOutputValue(value any) any {
switch v := value.(type) {
case []any:
out := make([]any, 0, len(v))
for _, item := range v {
out = append(out, NormalizeCodeExecOutputValue(item))
}
return out
case map[string]any:
out := make(map[string]any, len(v))
for key, item := range v {
out[key] = NormalizeCodeExecOutputValue(item)
}
return out
default:
return v
}
}
func InferCodeExecActualType(value any) string {
value = NormalizeCodeExecOutputValue(value)
switch v := value.(type) {
case nil:
return "Null"
case bool:
return "Boolean"
case string:
return "String"
case map[string]any:
return "Object"
case []any:
if len(v) == 0 {
return "Array<Any>"
}
first := InferCodeExecActualType(v[0])
for _, item := range v[1:] {
if InferCodeExecActualType(item) != first {
return "Array<Any>"
}
}
return "Array<" + first + ">"
case json.Number:
return "Number"
case float64:
if math.IsNaN(v) || math.IsInf(v, 0) {
return "Any"
}
return "Number"
case float32:
if math.IsNaN(float64(v)) || math.IsInf(float64(v), 0) {
return "Any"
}
return "Number"
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
return "Number"
default:
return "Any"
}
}
func RenderCodeExecCanonicalContent(value any) string {
value = NormalizeCodeExecOutputValue(value)
switch v := value.(type) {
case nil:
return ""
case string:
return v
case map[string]any, []any:
b, err := json.MarshalIndent(v, "", " ")
if err != nil {
return fmt.Sprint(v)
}
return string(b)
default:
return fmt.Sprint(v)
}
}
func selectCodeExecBusinessOutput(outputs map[string]any) (string, any, error) {
if len(outputs) == 1 {
for name, meta := range outputs {
if err := validateCodeExecBusinessOutputName(name); err != nil {
return "", nil, err
}
return name, meta, nil
}
}
var (
businessName string
businessMeta any
count int
)
for name, meta := range outputs {
if _, reserved := codeExecSystemOutputKeys[name]; reserved {
continue
}
count++
businessName = name
businessMeta = meta
}
if count != 1 {
return "", nil, fmt.Errorf("CodeExec contract must contain exactly one business output, got %d", count)
}
if err := validateCodeExecBusinessOutputName(businessName); err != nil {
return "", nil, err
}
return businessName, businessMeta, nil
}
func validateCodeExecBusinessOutputName(name string) error {
name = strings.TrimSpace(name)
if name == "" {
return fmt.Errorf("CodeExec business output name must not be empty")
}
if _, reserved := codeExecSystemOutputKeys[name]; reserved {
return fmt.Errorf("CodeExec reserved output name is not allowed: %s", name)
}
if strings.Contains(name, ".") {
return fmt.Errorf("CodeExec business output name must not contain '.': %s", name)
}
return nil
}
func validateCodeExecTopLevelValueDomain(value any) error {
switch value.(type) {
case nil, bool, string, map[string]any, []any, json.Number,
float64, float32, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
return nil
default:
return fmt.Errorf(
"CodeExec unsupported top-level result type: %T. Allowed top-level values are String, Number, Boolean, Object, Array, or Null.",
value,
)
}
}
func validateCodeExecExpectedType(expectedType string, value any, path string) error {
etype, err := normalizeCodeExecExpectedType(expectedType)
if err != nil {
return err
}
if etype == "" || strings.EqualFold(etype, "Any") {
return nil
}
value = NormalizeCodeExecOutputValue(value)
if strings.HasPrefix(etype, "Array<") && strings.HasSuffix(etype, ">") {
list, ok := value.([]any)
if !ok {
return fmt.Errorf(
"CodeExec contract mismatch at %s: expected type %s, got %s",
codeExecPathOrValue(path), etype, InferCodeExecActualType(value),
)
}
innerType := strings.TrimSpace(etype[len("Array<") : len(etype)-1])
for i, item := range list {
childPath := fmt.Sprintf("[%d]", i)
if path != "" {
childPath = path + childPath
}
if err := validateCodeExecExpectedType(innerType, item, childPath); err != nil {
return err
}
}
return nil
}
valid := false
switch etype {
case "String":
_, valid = value.(string)
case "Number":
switch value.(type) {
case json.Number, float64, float32, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
valid = true
}
case "Boolean":
_, valid = value.(bool)
case "Object":
_, valid = value.(map[string]any)
case "Null":
valid = value == nil
default:
return fmt.Errorf("Unsupported expected type: %s", expectedType)
}
if valid {
return nil
}
return fmt.Errorf(
"CodeExec contract mismatch at %s: expected type %s, got %s",
codeExecPathOrValue(path), etype, InferCodeExecActualType(value),
)
}
func normalizeCodeExecExpectedType(expectedType string) (string, error) {
etype := strings.TrimSpace(expectedType)
if etype == "" {
return "", nil
}
switch strings.ToLower(etype) {
case "string":
return "String", nil
case "number":
return "Number", nil
case "boolean":
return "Boolean", nil
case "object":
return "Object", nil
case "null":
return "Null", nil
case "any":
return "Any", nil
}
low := strings.ToLower(etype)
if strings.HasPrefix(low, "array<") && strings.HasSuffix(etype, ">") {
inner := strings.TrimSpace(etype[len("Array<") : len(etype)-1])
if inner == "" {
return "", fmt.Errorf("Unsupported expected type: %s", expectedType)
}
normalizedInner, err := normalizeCodeExecExpectedType(inner)
if err != nil {
return "", err
}
return "Array<" + normalizedInner + ">", nil
}
return etype, nil
}
func codeExecPathOrValue(path string) string {
if strings.TrimSpace(path) == "" {
return "value"
}
return path
}