Files
ragflow/internal/agent/component/fixture_stubs.go
Zhichang Yu 3f805a64f1 feat(agent): align Go agent behavior with Python (except retrieval component) (#16225)
## Summary

Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.

The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.

## Scope of alignment

### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.

### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.

### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.

### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.

### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.

## Out of scope (intentionally)

- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.

## How alignment is verified

`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle

`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).

`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.

```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```

## Design reference

`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-22 11:58:29 +08:00

509 lines
18 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Package component — e2e fixture stubs and compat shims.
//
// The test fixtures under internal/agent/dsl/testdata reference
// seven component names that are registered here: Retrieval,
// TavilySearch, ExeSQL, Generate, Answer, Iteration,
// IterationItem. Their bodies are deliberately trivial — they
// echo a stable, template-friendly output shape and never call
// the network or DB. The contract is "registered, non-panicking,
// and produces outputs downstream templates can resolve", not
// "do something useful". The Universe A wrappers in
// universe_a_wrappers.go and the real production bodies in
// their own .go files replace these stubs in production paths.
//
// The seven names were chosen by enumerating the component_name
// values in the testdata fixtures (see the `examples` var in
// internal/agent/canvas/dsl_examples_test.go). Keeping the list
// in sync with the fixture set is a single-source-of-truth
// discipline: if a new fixture references a name not in this file,
// the e2e test's compile+invoke loop will surface the gap with a
// clear factory error.
package component
import (
"context"
"fmt"
"ragflow/internal/agent/runtime"
)
// ----- Retrieval -----
const componentNameRetrieval = "Retrieval"
// RetrievalStub is a fixture stub for the Retrieval component. It
// returns an empty `formalized_content` so downstream templates
// that reference `{retrieval:0@formalized_content}` resolve to an
// empty string. The real production component (Dealer / KGSearch
// path) is registered as newRetrievalComponent in
// universe_a_wrappers.go and is the body that actually runs in
// production.
type RetrievalStub struct{}
// NewRetrievalStub constructs a Retrieval stub. params is accepted
// for API parity but unused at this stage (the real component
// parses kb_ids / similarity_threshold / top_n from it).
func NewRetrievalStub(_ map[string]any) (Component, error) {
return &RetrievalStub{}, nil
}
// Name returns the registered component name.
func (r *RetrievalStub) Name() string { return componentNameRetrieval }
// Invoke returns a stub result that downstream templates can
// resolve. `formalized_content` is the field the test fixtures
// reference; empty string is the safe fixture value.
func (r *RetrievalStub) Invoke(_ context.Context, _ map[string]any) (map[string]any, error) {
return map[string]any{"formalized_content": ""}, nil
}
// Stream mirrors Invoke as a single-chunk SSE stream.
func (r *RetrievalStub) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
out, err := r.Invoke(ctx, inputs)
if err != nil {
return nil, err
}
ch := make(chan map[string]any, 1)
ch <- out
close(ch)
return ch, nil
}
// Inputs returns the DSL param surface.
func (r *RetrievalStub) Inputs() map[string]string {
return map[string]string{
"kb_ids": "Knowledge base IDs to search over.",
"similarity_threshold": "Minimum vector similarity to include a chunk.",
"keywords_similarity_weight": "BM25 vs vector blend factor (0 = pure vector, 1 = pure BM25).",
"top_n": "Number of top chunks to keep after rerank.",
"top_k": "Number of candidates to retrieve before rerank.",
"rerank_id": "Optional rerank model identifier.",
"empty_response": "Fallback message when no chunks pass the threshold.",
}
}
// Outputs returns the public output surface.
func (r *RetrievalStub) Outputs() map[string]string {
return map[string]string{
"formalized_content": "Rendered chunks for downstream LLM prompts.",
}
}
// ----- TavilySearch -----
const componentNameTavilySearch = "TavilySearch"
// TavilySearchStub is a fixture stub for the TavilySearch tool. The
// real implementation (see internal/agent/tool/tavily.go) calls
// the Tavily HTTP API; this stub returns an empty result so the
// canvas e2e flow runs without network access.
type TavilySearchStub struct{}
// NewTavilySearchStub constructs a TavilySearch stub.
func NewTavilySearchStub(_ map[string]any) (Component, error) {
return &TavilySearchStub{}, nil
}
// Name returns the registered component name.
func (t *TavilySearchStub) Name() string { return componentNameTavilySearch }
// Invoke returns an empty `formalized_content` so downstream
// templates resolve.
func (t *TavilySearchStub) Invoke(_ context.Context, _ map[string]any) (map[string]any, error) {
return map[string]any{"formalized_content": ""}, nil
}
// Stream mirrors Invoke.
func (t *TavilySearchStub) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
out, err := t.Invoke(ctx, inputs)
if err != nil {
return nil, err
}
ch := make(chan map[string]any, 1)
ch <- out
close(ch)
return ch, nil
}
// Inputs returns the DSL param surface.
func (t *TavilySearchStub) Inputs() map[string]string {
return map[string]string{
"api_key": "Tavily API key.",
"query": "Search query template (may reference {iterationitem:0@result}).",
}
}
// Outputs returns the public output surface.
func (t *TavilySearchStub) Outputs() map[string]string {
return map[string]string{
"formalized_content": "Rendered search results for downstream LLM prompts.",
}
}
// ----- ExeSQL -----
const componentNameExeSQL = "ExeSQL"
const componentNameCodeExec = "CodeExec"
// ExeSQLStub is a fixture stub for the ExeSQL component. The real
// implementation (see internal/agent/tool/exesql.go) opens a MySQL
// connection and runs the user's SQL; this stub returns a fixed
// two-column schema so the e2e flow runs without a database.
type ExeSQLStub struct{}
// NewExeSQLStub constructs an ExeSQL stub.
func NewExeSQLStub(_ map[string]any) (Component, error) {
return &ExeSQLStub{}, nil
}
// Name returns the registered component name.
func (e *ExeSQLStub) Name() string { return componentNameExeSQL }
// Invoke returns a stable two-column stub result. Downstream
// templates that render SQL output will see headers + an empty row
// — enough for the message surface to format a string.
func (e *ExeSQLStub) Invoke(_ context.Context, _ map[string]any) (map[string]any, error) {
return map[string]any{
"columns": []string{"col1", "col2"},
"rows": [][]any{{"", ""}},
"sql": "",
}, nil
}
// Stream mirrors Invoke.
func (e *ExeSQLStub) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
out, err := e.Invoke(ctx, inputs)
if err != nil {
return nil, err
}
ch := make(chan map[string]any, 1)
ch <- out
close(ch)
return ch, nil
}
// Inputs returns the DSL param surface.
func (e *ExeSQLStub) Inputs() map[string]string {
return map[string]string{
"database": "Database / schema name.",
"username": "DB user.",
"host": "DB host.",
"port": "DB port.",
"password": "DB password.",
"top_n": "Limit on rows returned.",
}
}
// Outputs returns the public output surface.
func (e *ExeSQLStub) Outputs() map[string]string {
return map[string]string{
"columns": "Result-set column names.",
"rows": "Result-set rows (matrix form).",
"sql": "Resolved SQL string.",
}
}
// ----- Generate -----
const componentNameGenerate = "Generate"
// GenerateStub is a fixture stub for the legacy "Generate"
// component name. The Python DSL used "Generate" for a
// non-tool-using chat call; the Go port renamed the canonical
// name to "LLM" (see llm.go) and registers "Generate" here as a
// thin alias that routes to the LLM factory. Test fixtures that
// still reference the old name compile and run identically to
// LLM-backed flows.
type GenerateStub struct {
inner *LLMComponent
}
// NewGenerateStub constructs a Generate stub. params is forwarded to
// the LLM factory so Generate and LLM share the same param surface
// (llm_id, prompt, temperature, message_history_window_size, cite).
func NewGenerateStub(params map[string]any) (Component, error) {
llmParams, err := buildLLMParamFromV1Params(params)
if err != nil {
return nil, fmt.Errorf("Generate: %w", err)
}
return &GenerateStub{inner: NewLLMComponent(llmParams)}, nil
}
// Name returns the registered component name.
func (g *GenerateStub) Name() string { return componentNameGenerate }
// Invoke delegates to the LLM component.
func (g *GenerateStub) Invoke(ctx context.Context, inputs map[string]any) (map[string]any, error) {
return g.inner.Invoke(ctx, inputs)
}
// Stream delegates to the LLM component.
func (g *GenerateStub) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
return g.inner.Stream(ctx, inputs)
}
// Inputs returns the DSL param surface. Matches LLM's surface
// plus the v1-only message_history_window_size and cite.
func (g *GenerateStub) Inputs() map[string]string {
return map[string]string{
"llm_id": "LLM model identifier.",
"prompt": "System / user prompt template.",
"temperature": "Sampling temperature (0 = greedy).",
"message_history_window_size": "How many prior turns to include.",
"cite": "Whether to include source citations in the output.",
}
}
// Outputs returns the public output surface.
func (g *GenerateStub) Outputs() map[string]string {
return map[string]string{
"content": "Assistant text response.",
"model": "Resolved model identifier.",
"tokens": "Token count for the call.",
}
}
// buildLLMParamFromV1Params converts the test-fixture Generate params shape
// into the LLMParam shape. legacy fixtures store the user prompt under "prompt"
// (not "user_prompt") and the system prompt is sometimes empty (the
// system role is often folded into "prompt"). We map: prompt →
// UserPrompt, llm_id → ModelID, temperature → Temperature,
// base_url → BaseURL, api_key → APIKey.
func buildLLMParamFromV1Params(p map[string]any) (LLMParam, error) {
out := LLMParam{}
if v, ok := p["llm_id"].(string); ok {
out.ModelID = v
}
if v, ok := p["prompt"].(string); ok {
out.UserPrompt = v
}
if v, ok := p["temperature"].(float64); ok {
out.Temperature = &v
}
if v, ok := p["max_tokens"].(float64); ok {
i := int(v)
out.MaxTokens = &i
}
if v, ok := p["api_key"].(string); ok {
out.APIKey = v
}
if v, ok := p["base_url"].(string); ok {
out.BaseURL = v
}
return out, nil
}
// ----- Answer -----
const componentNameAnswer = "Answer"
// AnswerStub is a fixture stub for the Answer component. Answer
// is the agent's "wait for user" node (it pairs with ExeSQL or
// Message in conversational flows). The real implementation
// pauses the run and resumes on user input via the eino
// interrupt path (see canvas/interrupt_resume.go); the stub
// returns an empty answer immediately so the e2e flow can
// complete.
type AnswerStub struct{}
// NewAnswerStub constructs an Answer stub.
func NewAnswerStub(_ map[string]any) (Component, error) {
return &AnswerStub{}, nil
}
// Name returns the registered component name.
func (a *AnswerStub) Name() string { return componentNameAnswer }
// Invoke returns an empty answer. Real implementation will block
// until the user provides input; the stub is fire-and-forget so
// the e2e flow doesn't deadlock.
func (a *AnswerStub) Invoke(ctx context.Context, _ map[string]any) (map[string]any, error) {
// Mirror the no-state-check pattern of Message/Retrieval: we
// don't read state, but the signature must match.
if _, _, err := runtime.GetStateFromContext[*runtime.CanvasState](ctx); err != nil {
return nil, fmt.Errorf("Answer: %w", err)
}
return map[string]any{"answer": ""}, nil
}
// Stream mirrors Invoke.
func (a *AnswerStub) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
out, err := a.Invoke(ctx, inputs)
if err != nil {
return nil, err
}
ch := make(chan map[string]any, 1)
ch <- out
close(ch)
return ch, nil
}
// Inputs returns the DSL param surface.
func (a *AnswerStub) Inputs() map[string]string {
return map[string]string{
"question": "Optional clarification question to surface to the user.",
}
}
// Outputs returns the public output surface.
func (a *AnswerStub) Outputs() map[string]string {
return map[string]string{
"answer": "User's response text.",
}
}
// ----- Iteration / IterationItem (alias to Parallel) -----
// componentNameIteration / componentNameIterationItem are the legacy
// v1 names that the front-end may still emit. The Go port's runtime
// uses "Parallel" for the same concept; the two stubs registered
// below therefore alias through to the real Parallel factory. Any
// caller that bypasses dsl.NormalizeForCanvas (and so still sees the
// legacy names) gets Parallel behaviour for free; the canonical
// path through the decoder boundary uses dsl.NormalizeForCanvas's
// fold step (see internal/agent/dsl/normalize.go) to rewrite
// "Iteration" -> "Parallel" and drop LoopItem/IterationItem nodes
// before they reach the registry.
const (
componentNameIteration = "Iteration"
componentNameIterationItem = "IterationItem"
)
// IterationStub delegates every call to the real Parallel
// component. Existence of this type is purely for registry
// compatibility (the v1 "Iteration" name must resolve) and for
// test introspection (RegisteredNames() still returns "Iteration"
// so parallel_test.go assertions do not churn).
type IterationStub struct {
inner Component
}
// NewIterationStub constructs an Iteration alias by routing to the
// Parallel factory. Any error from the inner factory is surfaced
// unchanged.
func NewIterationStub(params map[string]any) (Component, error) {
inner, err := New(componentNameParallel, params)
if err != nil {
return nil, err
}
return &IterationStub{inner: inner}, nil
}
// Name returns the registered (legacy) component name.
func (i *IterationStub) Name() string { return componentNameIteration }
// Invoke delegates to the inner Parallel component.
func (i *IterationStub) Invoke(ctx context.Context, inputs map[string]any) (map[string]any, error) {
return i.inner.Invoke(ctx, inputs)
}
// Stream delegates to the inner Parallel component.
func (i *IterationStub) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
return i.inner.Stream(ctx, inputs)
}
// Inputs mirrors Parallel's surface for introspection.
func (i *IterationStub) Inputs() map[string]string {
return i.inner.Inputs()
}
// Outputs mirrors Parallel's surface for introspection.
func (i *IterationStub) Outputs() map[string]string {
return i.inner.Outputs()
}
// IterationItemStub is a fixture stub for the body node of an
// Iteration. The real wiring (parent_id → child routing) is
// engine-side; the stub itself is a passthrough.
type IterationItemStub struct{}
// NewIterationItemStub constructs an IterationItem stub.
func NewIterationItemStub(_ map[string]any) (Component, error) {
return &IterationItemStub{}, nil
}
// Name returns the registered component name.
func (it *IterationItemStub) Name() string { return componentNameIterationItem }
// Invoke returns a passthrough empty map.
func (it *IterationItemStub) Invoke(_ context.Context, _ map[string]any) (map[string]any, error) {
return map[string]any{"result": ""}, nil
}
// Stream mirrors Invoke.
func (it *IterationItemStub) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
out, err := it.Invoke(ctx, inputs)
if err != nil {
return nil, err
}
ch := make(chan map[string]any, 1)
ch <- out
close(ch)
return ch, nil
}
// Inputs returns the DSL param surface.
func (it *IterationItemStub) Inputs() map[string]string {
return map[string]string{
"item": "The current iteration item, injected by the Iteration parent.",
}
}
// Outputs returns the public output surface.
func (it *IterationItemStub) Outputs() map[string]string {
return map[string]string{
"result": "Body result for the current item.",
}
}
// ----- registrations -----
// One init per file keeps the registrations grouped and visible.
// Each Register call panics on a duplicate (the registry enforces
// uniqueness), so accidental double-registration in a later refactor
// surfaces as a panic at init time, not as a silent override.
func init() {
// Primary registration: Retrieval and ExeSQL go through the
// Universe A delegation wrappers in universe_a_wrappers.go
// (real eino tool plumbing). The stubs remain available for
// unit tests that want to assert the "no service wired" path
// via a direct constructor.
Register(componentNameRetrieval, newRetrievalComponent)
// The Python-side
// The agent canvas uses both a PascalCase "SearchMyDataset"
// and the original snake_case typo "search_my_dateset"; an
// intermediate "search_my_dataset" form also exists in some
// legacy DSLs. The Universe B tool registry accepts all
// three (see internal/agent/tool/registry.go); Universe A
// must mirror that surface or older DSLs fall back to
// "unknown component" at buildNodeBody time. All four
// registrations resolve to the same wrapper, so swapping
// the factory in the future only needs to happen once.
Register("SearchMyDataset", newRetrievalComponent)
Register("search_my_dataset", newRetrievalComponent)
Register("search_my_dateset", newRetrievalComponent)
Register(componentNameTavilySearch, NewTavilySearchStub)
Register(componentNameExeSQL, newExeSQLComponent)
Register(componentNameCodeExec, newCodeExecComponent)
Register(componentNameGenerate, NewGenerateStub)
Register(componentNameAnswer, NewAnswerStub)
Register(componentNameIteration, NewIterationStub)
Register(componentNameIterationItem, NewIterationItemStub)
}