Files
ragflow/internal/agent/component/string_transform.go

303 lines
9.4 KiB
Go
Raw Permalink Normal View History

feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952) Ports the agent canvas subsystem from Python to Go. ## What's included ### Canvas Engine (Phase 0/1) - State engine, scheduler, variable resolver, Redis checkpoint store, cancel protocol - **209 tests** across canvas / component / io packages ### 22 Components (P0–P4) | Tier | Components | |---|---| | P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin, Message, Invoke | | P1 T3 | VariableAggregator, VariableAssigner, StringTransform, ListOperations, DataOperations | | P2 T3 | Iteration, IterationItem, Loop, LoopItem | | P3 T3 | UserFillUp, Fillup | | P4 T5 | Browser, ExcelProcessor, DocsGenerator | ### DSL v2 Schema (Phase 2.5) - Typed v2 in-memory model with v1-to-v2 auto-detect converter - v1 legacy field stripping per plan §2.11.7 ### HTTP Endpoints & Bug Fixes (Plans PR1–PR3) - **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)` pattern - **CreateAgent validation**: title/DSL required, duplicate check, 103 envelope - **13 new endpoints**: templates, prompts, tags, sessions CRUD, chat/completions (SSE + non-stream stubs), rerun, test_db_connection, logs, webhook/logs - **756 Go unit tests** (745 → 756, +18) - **17 → 0 Python integration test failures** (test_agents.py + test_session_management/) ### Tools 21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory stubs ### Infrastructure OTel observability, NATS message queue, DeepDoc gRPC client, SSRF guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Package component — StringTransform (T3, plan §2.11.3 row 18).
//
// StringTransform has two modes:
//
// split — break a string on one or more literal delimiters
// merge — substitute {{name}} placeholders in a script with values
// pulled from the inputs map or the canvas state
//
// Mirrors agent/component/string_transform.py. The P1 port supports the
// common {{name}} placeholder shape only; the full Jinja2 surface
// (`{% if %}`, `{% for %}`) is deferred to a later phase per the plan.
package component
import (
"context"
"fmt"
"regexp"
"strings"
"ragflow/internal/agent/runtime"
)
const componentNameStringTransform = "StringTransform"
// stringTransformParam is the static configuration.
type stringTransformParam struct {
Method string `json:"method"` // "split" or "merge"
Script string `json:"script"` // merge mode: template
SplitRef string `json:"split_ref"` // split mode: state ref to read
Delimiters []string `json:"delimiters"` // split mode: literal delimiters
}
// Update copies a fresh param map into the receiver.
func (p *stringTransformParam) Update(conf map[string]any) error {
if conf == nil {
conf = map[string]any{}
}
p.Method, _ = conf["method"].(string)
if p.Method == "" {
p.Method = "split"
}
p.Script, _ = conf["script"].(string)
p.SplitRef, _ = conf["split_ref"].(string)
switch v := conf["delimiters"].(type) {
case []any:
out := make([]string, 0, len(v))
for _, item := range v {
if s, ok := item.(string); ok {
out = append(out, s)
}
}
p.Delimiters = out
case []string:
// already correct shape
p.Delimiters = append(p.Delimiters[:0], v...)
case nil:
// leave unchanged
default:
// unknown shape — treat as empty; Check() will reject
p.Delimiters = nil
}
return nil
}
// Check validates the param.
func (p *stringTransformParam) Check() error {
switch p.Method {
case "split", "merge":
// ok
default:
return &ParamError{Field: "method", Reason: "must be one of: split, merge"}
}
if len(p.Delimiters) == 0 {
return &ParamError{Field: "delimiters", Reason: "must not be empty"}
}
return nil
}
// AsDict returns the params as a plain map.
func (p *stringTransformParam) AsDict() map[string]any {
return map[string]any{
"method": p.Method,
"script": p.Script,
"split_ref": p.SplitRef,
"delimiters": p.Delimiters,
}
}
// placeholderPattern matches {{name}} where name is an identifier-like
// sequence. Intentionally narrower than the canvas var-ref pattern
// (which also handles sys.x / env.x) because merge placeholders are
// looked up in the inputs map and/or canvas state by simple key, not
// the full cpn_id@param / sys.x / env.x grammar.
var placeholderPattern = regexp.MustCompile(`\{\{\s*([A-Za-z_][A-Za-z0-9_]*)\s*\}\}`)
// StringTransformComponent implements the split/merge component.
type StringTransformComponent struct {
name string
param stringTransformParam
}
// NewStringTransformComponent constructs a StringTransform from the
// DSL param map.
func NewStringTransformComponent(params map[string]any) (Component, error) {
p := &stringTransformParam{}
if err := p.Update(params); err != nil {
return nil, fmt.Errorf("StringTransform: param update: %w", err)
}
if err := p.Check(); err != nil {
return nil, fmt.Errorf("StringTransform: param check: %w", err)
}
return &StringTransformComponent{
name: componentNameStringTransform,
param: *p,
}, nil
}
// Name returns the registered component name.
func (s *StringTransformComponent) Name() string { return s.name }
// Invoke runs the configured method (split or merge) and returns
// outputs["result"] with the transformed payload.
func (s *StringTransformComponent) Invoke(ctx context.Context, inputs map[string]any) (map[string]any, error) {
state, _, err := runtime.GetStateFromContext[*runtime.CanvasState](ctx)
if err != nil {
return nil, fmt.Errorf("StringTransform: %w", err)
}
if state == nil {
return nil, fmt.Errorf("StringTransform: nil canvas state")
}
if s.param.Method == "split" {
return s.doSplit(ctx, state, inputs)
}
return s.doMerge(ctx, state, inputs), nil
}
// Stream mirrors Invoke; StringTransform is a single-shot transform.
func (s *StringTransformComponent) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
out, err := s.Invoke(ctx, inputs)
if err != nil {
return nil, err
}
ch := make(chan map[string]any, 1)
ch <- out
close(ch)
return ch, nil
}
// Inputs returns the parameter surface. The shape depends on the
// configured method.
func (s *StringTransformComponent) Inputs() map[string]string {
if s.param.Method == "split" {
return map[string]string{
"line": "Optional direct string to split; if absent, the component reads state[split_ref].",
}
}
// merge: placeholders derived from the script
names := extractPlaceholders(s.param.Script)
out := make(map[string]string, len(names))
for _, n := range names {
out[n] = "Value to substitute for {{" + n + "}} (drawn from inputs or state)."
}
return out
}
// Outputs returns the transformed payload.
func (s *StringTransformComponent) Outputs() map[string]string {
return map[string]string{
"result": "Split: a []string of kept tokens. Merge: a single string with placeholders resolved.",
}
}
// doSplit runs the split method. Mirrors the Python _split helper
// (string_transform.py:76-91): build a regex of the literal
// delimiters, split with capture groups, keep the even-indexed
// (non-delimiter) tokens.
func (s *StringTransformComponent) doSplit(_ context.Context, state *runtime.CanvasState, inputs map[string]any) (map[string]any, error) {
var varValue string
if line, ok := inputs["line"].(string); ok && line != "" {
varValue = line
} else if s.param.SplitRef != "" {
v, err := state.GetVar(s.param.SplitRef)
if err != nil {
return nil, fmt.Errorf("StringTransform: split_ref %q: %w", s.param.SplitRef, err)
}
if v == nil {
varValue = ""
} else if s, ok := v.(string); ok {
varValue = s
} else {
return nil, fmt.Errorf("StringTransform: split input is not a string: %T", v)
}
}
// Build the regex: |.join([regexp.QuoteMeta(d) for d in delimiters])
parts := make([]string, 0, len(s.param.Delimiters))
for _, d := range s.param.Delimiters {
parts = append(parts, regexp.QuoteMeta(d))
}
pattern := "(?s)(" + strings.Join(parts, "|") + ")"
re, err := regexp.Compile(pattern)
if err != nil {
return nil, fmt.Errorf("StringTransform: bad delimiter pattern: %w", err)
}
matches := re.FindAllStringIndex(varValue, -1)
// Walk the input string, collecting the content between delimiter
// matches. This mirrors Python's re.split with a capture group
// (which interleaves content and delimiter tokens) followed by
// dropping the odd-indexed (delimiter) tokens. When there are no
// matches, the whole input is a single content token.
kept := make([]string, 0, len(matches)+1)
prevEnd := 0
for _, m := range matches {
kept = append(kept, varValue[prevEnd:m[0]])
prevEnd = m[1]
}
kept = append(kept, varValue[prevEnd:])
return map[string]any{"result": kept}, nil
}
// doMerge runs the merge method. Mirrors the Python _merge helper
// (string_transform.py:93-112): collect {{name}} placeholders, resolve
// each from inputs (preferred) or canvas state, substitute, and emit
// the resolved script.
func (s *StringTransformComponent) doMerge(_ context.Context, state *runtime.CanvasState, inputs map[string]any) map[string]any {
script := s.param.Script
feat(agent): align Go agent behavior with Python (except retrieval component) (#16225) ## Summary Aligns the **Go agent runtime/canvas/components/tools** behavior with the **Python `agent/` implementation** so the same stored canvas DSL produces the same execution result on either side. Every component, tool, and runtime primitive in `internal/agent/` is now driven by the same semantics as its Python counterpart — variable resolution, template substitution, control flow, error reporting, retry/cancel, and stream event shapes. The **retrieval component is the one explicit exception** in this PR. It is being reworked in a separate change and is excluded from this alignment pass; the wrapper slot (`universe_a_wrappers.go → newRetrievalComponent`) is preserved. ## Scope of alignment ### Components (all aligned with `agent/component/`) `Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs, MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure, JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent` (react + tool artifact capture + `Reset()` interface-assert) · `Switch` (12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke` · `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`) · `UserFillUp` (Python-equivalent interrupt/resume via eino `compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` · `ListOperations` · `StringTransform` · `VariableAggregator` · `VariableAssigner` · `Browser` (full stagehand runtime parity) · `DocsGenerator` · `ExcelProcessor`. ### Tools (all aligned with `agent/tools/`) `Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter` (streamable-HTTP) · `CodeExec` (sandbox bridge with `code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv` · `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` · `Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG` · `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` — uniform `eino tool.InvokableTool` interface, SSRF protection, shared HTTP client. ### Canvas execution engine (`internal/agent/canvas/`) Aligned with Python's `agent/canvas.py`: - **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas, per-component timeout resolver (4-level: per-class env → per-class table → uniform env → 600s fallback), `legacyNoOpNames`. - **Loop subgraph** (`loop_subgraph.go`): Python-equivalent `AddLoopNode` macro expansion + condition translation. - **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing via `compose.NewGraphMultiBranch` — same branch selection semantics as Python. - **Parallel subgraph** (`parallel_subgraph.go`): matches Python's parallel fan-out contract. - **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` / `IsInterruptError` / `ExtractInterruptContexts` — replaces the deprecated Python sentinel chain with eino's native interrupt API, preserving the same external behavior. - **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore` Get/Set/Delete, with business metadata (status / canvas_id / parent_run_id) on a parallel Redis Hash. - **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed / MarkCancelled / AttachCheckpoint — same lifecycle as the Python run record. - **Cancel** (`cancel.go`): Redis pub/sub watch. - **Stream** (`stream.go`): SSE channel with `messages` / `waiting` / `errors` / `done` events, same shape as Python's `agent.canvas.RunEvent` payload. ### DSL bridge (`internal/agent/dsl/`) - `normalize.go`: v1↔v2 collapsed into a single wire format — Python and Go consume the same stored JSON. - `reset.go`: per-run state reset matches Python's `Canvas.reset()` semantics. - Testdata mirrors Python's `agent_msg.json` / `all.json` / etc. ### Runtime (`internal/agent/runtime/`) - `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`: same `{{cpn_id@param}}` resolution model. - `ResolveTemplate` (regex fast path + gonja fallback) — Python Jinja-style semantics. - `selector.go`, `metrics.go`, `component.go`: shared runtime contracts. ## Out of scope (intentionally) - **`Retrieval` component logic** — wrapped only; full parity lands in a follow-up PR. - **Frontend** — only minor dsl-bridge / canvas UX fixes ride along. - **CLI / admin / model registry** — orthogonal to agent behavior. ## How alignment is verified `internal/service/agent_run_e2e_test.go` exercises the **full production chain** against real Python-shaped DSL fixtures: ``` loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL → canvas.Compile → cc.Workflow.Invoke → answer extraction ``` using in-memory SQLite + miniredis (no Docker). Covers: - `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}` resolution - `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle (Python-equivalent) - `TestRunAgent_RealCanvas_CompileFails` — unknown component name → sanitized error (Python-equivalent) - `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref (Python-equivalent) - `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` — Start→AttachCheckpoint→MarkSucceeded lifecycle `internal/handler/agent_test.go` — SSE streaming parity (`Content-Type: text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`, OpenAI-compatible non-stream `choices`). `internal/agent/canvas/fixture_compile_test.go` + per-component tests pin the Python-equivalent outputs. ``` go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/ ``` ## Design reference `docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked 2026-06-17) — module layout, per-component / per-tool inventory, corner-case catalogue, and the actionable backlog (Section 14, including the retrieval alignment follow-up). --------- Co-authored-by: Claude <noreply@anthropic.com>
2026-06-22 11:58:29 +08:00
// First pass: state-level template resolution for any runtime ref
// syntax the canvas supports, including single-brace legacy refs and
// iteration aliases like {item}/{index}.
if resolved, err := runtime.ResolveTemplateAuto(script, state); err == nil {
script = resolved
feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952) Ports the agent canvas subsystem from Python to Go. ## What's included ### Canvas Engine (Phase 0/1) - State engine, scheduler, variable resolver, Redis checkpoint store, cancel protocol - **209 tests** across canvas / component / io packages ### 22 Components (P0–P4) | Tier | Components | |---|---| | P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin, Message, Invoke | | P1 T3 | VariableAggregator, VariableAssigner, StringTransform, ListOperations, DataOperations | | P2 T3 | Iteration, IterationItem, Loop, LoopItem | | P3 T3 | UserFillUp, Fillup | | P4 T5 | Browser, ExcelProcessor, DocsGenerator | ### DSL v2 Schema (Phase 2.5) - Typed v2 in-memory model with v1-to-v2 auto-detect converter - v1 legacy field stripping per plan §2.11.7 ### HTTP Endpoints & Bug Fixes (Plans PR1–PR3) - **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)` pattern - **CreateAgent validation**: title/DSL required, duplicate check, 103 envelope - **13 new endpoints**: templates, prompts, tags, sessions CRUD, chat/completions (SSE + non-stream stubs), rerun, test_db_connection, logs, webhook/logs - **756 Go unit tests** (745 → 756, +18) - **17 → 0 Python integration test failures** (test_agents.py + test_session_management/) ### Tools 21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory stubs ### Infrastructure OTel observability, NATS message queue, DeepDoc gRPC client, SSRF guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
}
// Second pass: {{name}} placeholders → values from inputs, then state.
names := extractPlaceholders(script)
if len(names) == 0 {
return map[string]any{"result": script}
}
for _, n := range names {
placeholder := "{{" + n + "}}"
var value any
if v, ok := inputs[n]; ok {
value = v
} else if v, err := state.GetVar(n); err == nil && v != nil {
value = v
} else {
value = ""
}
script = strings.ReplaceAll(script, placeholder, fmt.Sprintf("%v", value))
}
return map[string]any{"result": script}
}
// extractPlaceholders returns the unique placeholder names appearing
// in s, in first-occurrence order.
func extractPlaceholders(s string) []string {
if s == "" {
return nil
}
matches := placeholderPattern.FindAllStringSubmatch(s, -1)
if len(matches) == 0 {
return nil
}
seen := make(map[string]struct{}, len(matches))
out := make([]string, 0, len(matches))
for _, m := range matches {
if len(m) < 2 {
continue
}
name := m[1]
if _, dup := seen[name]; dup {
continue
}
seen[name] = struct{}{}
out = append(out, name)
}
return out
}
func init() {
Register(componentNameStringTransform, NewStringTransformComponent)
}