Files
ragflow/internal/agent/component/data_operations.go
Zhichang Yu 3f805a64f1 feat(agent): align Go agent behavior with Python (except retrieval component) (#16225)
## Summary

Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.

The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.

## Scope of alignment

### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.

### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.

### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.

### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.

### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.

## Out of scope (intentionally)

- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.

## How alignment is verified

`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle

`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).

`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.

```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```

## Design reference

`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-22 11:58:29 +08:00

543 lines
15 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Package component — DataOperations (T3, plan §2.11.3 row 16).
//
// DataOperations applies one of seven dict/list transforms to a list
// of dicts pulled from the canvas state. It is pure: no state writes;
// the transformed payload is returned at outputs["result"].
//
// Operations:
// - select_keys : keep only the listed keys per dict
// - literal_eval : walk input_objects; try to parse JSON-like
// string leaves (the Go port uses json.Unmarshal
// as a stand-in for Python's ast.literal_eval —
// tuples/sets are NOT supported, matching the
// JSON-shaped LLM output the canvas typically
// consumes).
// - combine : merge all input dicts into one
// - filter_values : keep dicts matching all rules
// - append_or_update: apply updates [{key, value}] per dict
// - remove_keys : drop the listed keys per dict
// - rename_keys : rename per [{old_key, new_key}] per dict
//
// Mirrors agent/component/data_operations.py.
package component
import (
"context"
"encoding/json"
"fmt"
"strings"
"ragflow/internal/agent/runtime"
)
const componentNameDataOperations = "DataOperations"
// dataOperationsParam is the static configuration.
type dataOperationsParam struct {
Query []string `json:"query"`
Operations string `json:"operations"`
SelectKeys []string `json:"select_keys"`
FilterValues []map[string]any `json:"filter_values"`
Updates []map[string]any `json:"updates"`
RemoveKeys []string `json:"remove_keys"`
RenameKeys []map[string]any `json:"rename_keys"`
}
// Update copies a fresh param map into the receiver.
func (p *dataOperationsParam) Update(conf map[string]any) error {
if conf == nil {
conf = map[string]any{}
}
p.Query = toStringSlice(conf["query"])
p.Operations, _ = conf["operations"].(string)
if p.Operations == "" {
p.Operations = "literal_eval"
}
p.SelectKeys = toStringSlice(conf["select_keys"])
p.FilterValues = toMapSlice(conf["filter_values"])
p.Updates = toMapSlice(conf["updates"])
p.RemoveKeys = toStringSlice(conf["remove_keys"])
p.RenameKeys = toMapSlice(conf["rename_keys"])
return nil
}
// Check validates the param.
func (p *dataOperationsParam) Check() error {
switch p.Operations {
case "select_keys", "literal_eval", "combine", "filter_values",
"append_or_update", "remove_keys", "rename_keys":
// ok
default:
return &ParamError{
Field: "operations",
Reason: "must be one of: select_keys, literal_eval, combine, filter_values, append_or_update, remove_keys, rename_keys",
}
}
return nil
}
// AsDict returns the params as a plain map.
func (p *dataOperationsParam) AsDict() map[string]any {
return map[string]any{
"query": p.Query,
"operations": p.Operations,
"select_keys": p.SelectKeys,
"filter_values": p.FilterValues,
"updates": p.Updates,
"remove_keys": p.RemoveKeys,
"rename_keys": p.RenameKeys,
}
}
// toStringSlice normalizes a value to []string. Strings (CSV) and
// []any are accepted; nil returns nil.
func toStringSlice(v any) []string {
switch x := v.(type) {
case nil:
return nil
case string:
// CSV fallback: "a,b,c" → ["a","b","c"]
parts := strings.Split(x, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
s := strings.TrimSpace(p)
if s != "" {
out = append(out, s)
}
}
return out
case []any:
out := make([]string, 0, len(x))
for _, item := range x {
if s, ok := item.(string); ok {
out = append(out, s)
}
}
return out
case []string:
return append([]string{}, x...)
}
return nil
}
// toMapSlice normalizes a value to []map[string]any.
func toMapSlice(v any) []map[string]any {
switch x := v.(type) {
case nil:
return nil
case []any:
out := make([]map[string]any, 0, len(x))
for _, item := range x {
if m, ok := item.(map[string]any); ok {
out = append(out, m)
}
}
return out
case []map[string]any:
return append([]map[string]any{}, x...)
}
return nil
}
// DataOperationsComponent implements the 7 dict transforms.
type DataOperationsComponent struct {
name string
param dataOperationsParam
}
// NewDataOperationsComponent constructs a DataOperations from the
// DSL param map.
func NewDataOperationsComponent(params map[string]any) (Component, error) {
p := &dataOperationsParam{}
if err := p.Update(params); err != nil {
return nil, fmt.Errorf("DataOperations: param update: %w", err)
}
if err := p.Check(); err != nil {
return nil, fmt.Errorf("DataOperations: param check: %w", err)
}
return &DataOperationsComponent{
name: componentNameDataOperations,
param: *p,
}, nil
}
// Name returns the registered component name.
func (d *DataOperationsComponent) Name() string { return d.name }
// Invoke loads input_objects from the configured query refs, then
// dispatches to the operation-specific helper.
func (d *DataOperationsComponent) Invoke(ctx context.Context, _ map[string]any) (map[string]any, error) {
state, _, err := runtime.GetStateFromContext[*runtime.CanvasState](ctx)
if err != nil {
return nil, fmt.Errorf("DataOperations: %w", err)
}
if state == nil {
return nil, fmt.Errorf("DataOperations: nil canvas state")
}
// Coerce query to a list: param.query may arrive as a single
// string in the JSON DSL, which the Python code wraps in [x].
queries := d.param.Query
if len(queries) == 0 {
// fall back to single ref parsed from a string param — when
// the engine loads the DSL it may pass a single ref; tolerate.
queries = []string{}
}
var inputObjects []map[string]any
for _, ref := range queries {
if ref == "" {
continue
}
v, err := state.GetVar(ref)
if err != nil {
return nil, fmt.Errorf("DataOperations: query %q: %w", ref, err)
}
if v == nil {
continue
}
switch x := v.(type) {
case map[string]any:
inputObjects = append(inputObjects, x)
case []any:
for _, item := range x {
if m, ok := item.(map[string]any); ok {
inputObjects = append(inputObjects, m)
}
}
}
}
var result any
switch d.param.Operations {
case "select_keys":
result = d.opSelectKeys(inputObjects)
case "literal_eval":
result = d.opLiteralEval(inputObjects)
case "combine":
result = d.opCombine(inputObjects)
case "filter_values":
result = d.opFilterValues(state, inputObjects)
case "append_or_update":
result = d.opAppendOrUpdate(state, inputObjects)
case "remove_keys":
result = d.opRemoveKeys(inputObjects)
case "rename_keys":
result = d.opRenameKeys(inputObjects)
}
return map[string]any{"result": result}, nil
}
// Stream mirrors Invoke; DataOperations is a single-shot transform.
func (d *DataOperationsComponent) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
out, err := d.Invoke(ctx, inputs)
if err != nil {
return nil, err
}
ch := make(chan map[string]any, 1)
ch <- out
close(ch)
return ch, nil
}
// Inputs returns an empty surface — all config is in the param.
func (d *DataOperationsComponent) Inputs() map[string]string {
return map[string]string{
"query": "List of data items to operate on. Can override the static DSL param.",
"operations": "Operation name: literal_eval, select_keys, filter, update, remove, rename. Overrides DSL param.",
"select_keys": "List of keys to keep (for select_keys operation). Overrides DSL param.",
"filter_values": "List of {key, op, value} filters (for filter operation). Overrides DSL param.",
"updates": "List of {key, value} updates (for update operation). Overrides DSL param.",
"remove_keys": "List of keys to remove (for remove operation). Overrides DSL param.",
"rename_keys": "List of {old_key, new_key} mappings (for rename operation). Overrides DSL param.",
}
}
// Outputs returns the transformed payload.
func (d *DataOperationsComponent) Outputs() map[string]string {
return map[string]string{
"result": "Transformed payload: a list of dicts for most ops, or a single dict for combine.",
}
}
// opSelectKeys keeps only the listed keys per dict. Result is []any
// of dicts.
func (d *DataOperationsComponent) opSelectKeys(items []map[string]any) []any {
keep := make(map[string]struct{}, len(d.param.SelectKeys))
for _, k := range d.param.SelectKeys {
keep[k] = struct{}{}
}
out := make([]any, 0, len(items))
for _, item := range items {
cp := make(map[string]any, len(keep))
for k := range item {
if _, ok := keep[k]; ok {
cp[k] = item[k]
}
}
out = append(out, cp)
}
return out
}
// opLiteralEval walks the input list and tries to JSON-decode any
// string leaf that looks like a JSON literal. Returns a list of
// (possibly-mutated) dicts.
func (d *DataOperationsComponent) opLiteralEval(items []map[string]any) []any {
out := make([]any, 0, len(items))
for _, item := range items {
out = append(out, recursiveEval(item))
}
return out
}
// recursiveEval mirrors the Python _recursive_eval helper: any string
// that starts with a JSON delimiter or known literal is unmarshaled.
// On failure, the original string is returned.
func recursiveEval(v any) any {
switch x := v.(type) {
case map[string]any:
out := make(map[string]any, len(x))
for k, val := range x {
out[k] = recursiveEval(val)
}
return out
case []any:
out := make([]any, 0, len(x))
for _, item := range x {
out = append(out, recursiveEval(item))
}
return out
case string:
s := strings.TrimSpace(x)
if s == "" {
return x
}
// Detect likely JSON literal: starts with one of { [ ( " '
// digit, or is a known scalar literal (true/false/null).
first := s[0]
lower := strings.ToLower(s)
isLiteral := false
switch first {
case '{', '[', '(', '"', '\'':
isLiteral = true
}
if !isLiteral {
// digit
if first >= '0' && first <= '9' {
isLiteral = true
}
}
if !isLiteral && (lower == "true" || lower == "false" || lower == "null" || lower == "none") {
isLiteral = true
}
if !isLiteral {
return x
}
var parsed any
// Try JSON. If it fails, return the original string.
if err := json.Unmarshal([]byte(s), &parsed); err == nil {
return parsed
}
return x
}
return v
}
// opCombine merges all input dicts into one. Key conflicts:
// - existing is a list → extend (or append if new is scalar)
// - existing is scalar, new is list → wrap as [old, *new]
// - existing is scalar, new is scalar → wrap as [old, new]
func (d *DataOperationsComponent) opCombine(items []map[string]any) map[string]any {
out := map[string]any{}
for _, obj := range items {
for k, v := range obj {
existing, ok := out[k]
if !ok {
out[k] = v
continue
}
switch ex := existing.(type) {
case []any:
if vl, ok := v.([]any); ok {
out[k] = append(ex, vl...)
} else {
out[k] = append(ex, v)
}
default:
if vl, ok := v.([]any); ok {
out[k] = []any{ex, vl}
} else {
out[k] = []any{ex, v}
}
}
}
}
return out
}
// opFilterValues keeps dicts where every rule matches.
func (d *DataOperationsComponent) opFilterValues(state *runtime.CanvasState, items []map[string]any) []any {
rules := d.param.FilterValues
out := make([]any, 0, len(items))
for _, obj := range items {
if len(rules) == 0 {
out = append(out, obj)
continue
}
all := true
for _, rule := range rules {
if !matchRule(state, obj, rule) {
all = false
break
}
}
if all {
out = append(out, obj)
}
}
return out
}
// matchRule evaluates one filter rule against obj. Mirrors the
// Python match_rule helper.
func matchRule(state *runtime.CanvasState, obj map[string]any, rule map[string]any) bool {
key, _ := rule["key"].(string)
if _, ok := obj[key]; !ok {
return false
}
op := strings.ToLower(asString(rule["operator"]))
if op == "" {
op = "equals"
}
target := normValue(rule["value"])
// Try to resolve {{...}} in target via state.
if s, ok := rule["value"].(string); ok && strings.Contains(s, "{{") {
if resolved, err := runtime.ResolveTemplate(s, state); err == nil {
target = resolved
}
}
v := normValue(obj[key])
switch op {
case "=", "equals":
return v == target
case "≠", "!=":
return v != target
case "contains":
return strings.Contains(v, target)
case "start with":
return strings.HasPrefix(v, target)
case "end with":
return strings.HasSuffix(v, target)
}
return false
}
// asString is a forgiving cast for params that may arrive as int/str.
func asString(v any) string {
if s, ok := v.(string); ok {
return s
}
return fmt.Sprintf("%v", v)
}
// opAppendOrUpdate copies each dict and applies updates. Values that
// look like {{ref}} are resolved via state; otherwise used as-is.
func (d *DataOperationsComponent) opAppendOrUpdate(state *runtime.CanvasState, items []map[string]any) []any {
out := make([]any, 0, len(items))
for _, obj := range items {
cp := make(map[string]any, len(obj))
for k, v := range obj {
cp[k] = v
}
for _, upd := range d.param.Updates {
k := strings.TrimSpace(asString(upd["key"]))
if k == "" {
continue
}
raw := upd["value"]
// Resolve {{...}} templates first; fall back to plain
// state-ref resolution (matches the Python
// get_value_with_variable behavior — strings are looked
// up in state when they look like refs).
if s, ok := raw.(string); ok {
if strings.Contains(s, "{{") {
if resolved, err := runtime.ResolveTemplate(s, state); err == nil && resolved != "" {
cp[k] = resolved
continue
}
}
if v, err := state.GetVar(s); err == nil && v != nil {
cp[k] = v
continue
}
}
cp[k] = raw
}
out = append(out, cp)
}
return out
}
// opRemoveKeys copies each dict and drops the listed keys.
func (d *DataOperationsComponent) opRemoveKeys(items []map[string]any) []any {
out := make([]any, 0, len(items))
for _, obj := range items {
cp := make(map[string]any, len(obj))
for k, v := range obj {
cp[k] = v
}
for _, k := range d.param.RemoveKeys {
if _, ok := cp[k]; ok {
delete(cp, k)
}
}
out = append(out, cp)
}
return out
}
// opRenameKeys copies each dict and renames per the configured pairs.
func (d *DataOperationsComponent) opRenameKeys(items []map[string]any) []any {
out := make([]any, 0, len(items))
for _, obj := range items {
cp := make(map[string]any, len(obj))
for k, v := range obj {
cp[k] = v
}
for _, pair := range d.param.RenameKeys {
old := strings.TrimSpace(asString(pair["old_key"]))
new := strings.TrimSpace(asString(pair["new_key"]))
if old == "" || new == "" || old == new {
continue
}
if v, ok := cp[old]; ok {
cp[new] = v
delete(cp, old)
}
}
out = append(out, cp)
}
return out
}
func init() {
Register(componentNameDataOperations, NewDataOperationsComponent)
}