mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-01 00:05:43 +08:00
## Summary
Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.
The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.
## Scope of alignment
### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.
### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.
### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.
### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.
### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.
## Out of scope (intentionally)
- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.
## How alignment is verified
`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle
`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).
`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.
```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```
## Design reference
`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).
---------
Co-authored-by: Claude <noreply@anthropic.com>
285 lines
9.0 KiB
Go
285 lines
9.0 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
// Package component — Invoke component (T3).
|
|
//
|
|
// Invoke is the canvas HTTP client node. It supports GET/POST/
|
|
// PUT/DELETE with custom headers, optional proxy, and per-request
|
|
// timeout, and wraps the underlying net/http.Transport with
|
|
// go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp
|
|
// .NewTransport so outbound calls automatically propagate W3C
|
|
// traceparent headers.
|
|
package component
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
|
)
|
|
|
|
const (
|
|
componentNameInvoke = "Invoke"
|
|
|
|
defaultInvokeTimeout = 30 * time.Second
|
|
defaultInvokeUserAgent = "ragflow-agent/1.0 (Invoke component)"
|
|
defaultInvokeContentCT = "application/json"
|
|
maxInvokeResponseBody = 16 << 20 // 16 MiB; hard cap to avoid OOM
|
|
)
|
|
|
|
// InvokeComponent is the HTTP client node. Stateless across invocations.
|
|
type InvokeComponent struct {
|
|
name string
|
|
}
|
|
|
|
// NewInvokeComponent constructs an Invoke component.
|
|
func NewInvokeComponent(_ map[string]any) (Component, error) {
|
|
return &InvokeComponent{name: componentNameInvoke}, nil
|
|
}
|
|
|
|
// Name returns the registered component name.
|
|
func (i *InvokeComponent) Name() string { return i.name }
|
|
|
|
// Invoke executes a single HTTP request and returns the status code,
|
|
// body, and response headers. See Inputs() for the param contract.
|
|
func (i *InvokeComponent) Invoke(ctx context.Context, inputs map[string]any) (map[string]any, error) {
|
|
method, _ := inputs["method"].(string)
|
|
method = strings.ToUpper(strings.TrimSpace(method))
|
|
switch method {
|
|
case http.MethodGet, http.MethodPost, http.MethodPut, http.MethodDelete:
|
|
default:
|
|
return nil, fmt.Errorf("Invoke: invalid method %q (want GET/POST/PUT/DELETE)", method)
|
|
}
|
|
rawURL, _ := inputs["url"].(string)
|
|
if rawURL == "" {
|
|
return nil, errors.New("Invoke: url is required")
|
|
}
|
|
// url.Parse is a sanity check; we trust the orchestrator to have
|
|
// already resolved any {{...}} refs, but a bad string here is a
|
|
// programmer error worth surfacing.
|
|
if _, err := url.Parse(rawURL); err != nil {
|
|
return nil, fmt.Errorf("Invoke: parse url: %w", err)
|
|
}
|
|
|
|
timeout := defaultInvokeTimeout
|
|
if v, ok := inputs["timeout"].(int); ok && v > 0 {
|
|
timeout = time.Duration(v) * time.Second
|
|
} else if v, ok := inputs["timeout"].(float64); ok && v > 0 {
|
|
timeout = time.Duration(v) * time.Second
|
|
}
|
|
|
|
contentType, _ := inputs["content_type"].(string)
|
|
if contentType == "" && (method == http.MethodPost || method == http.MethodPut) {
|
|
contentType = defaultInvokeContentCT
|
|
}
|
|
|
|
var body io.Reader
|
|
if s, ok := inputs["body"].(string); ok && s != "" {
|
|
body = bytes.NewReader([]byte(s))
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, method, rawURL, body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Invoke: build request: %w", err)
|
|
}
|
|
if contentType != "" {
|
|
req.Header.Set("Content-Type", contentType)
|
|
}
|
|
req.Header.Set("User-Agent", defaultInvokeUserAgent)
|
|
if h, ok := inputs["headers"].(map[string]any); ok {
|
|
for k, v := range h {
|
|
if s, ok := v.(string); ok {
|
|
req.Header.Set(k, s)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Wrap the stdlib Transport with otelhttp so the request gets a
|
|
// child span + W3C traceparent injected automatically.
|
|
transport := otelhttp.NewTransport(http.DefaultTransport)
|
|
// Optional proxy support: if inputs["proxy"] is set, build a
|
|
// dedicated Transport that uses it. This avoids mutating the
|
|
// global http.DefaultTransport (which would also affect unrelated
|
|
// components in the same process).
|
|
if proxyStr, ok := inputs["proxy"].(string); ok && proxyStr != "" {
|
|
transport = otelhttp.NewTransport(&http.Transport{
|
|
Proxy: http.ProxyURL(mustParseProxy(proxyStr)),
|
|
})
|
|
}
|
|
|
|
client := &http.Client{
|
|
Timeout: timeout,
|
|
Transport: transport,
|
|
}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Invoke: do: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Cap the response body to keep a hostile server from streaming
|
|
// infinite bytes into memory.
|
|
limited := io.LimitReader(resp.Body, maxInvokeResponseBody)
|
|
bodyBytes, err := io.ReadAll(limited)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Invoke: read body: %w", err)
|
|
}
|
|
|
|
hdr := make(map[string]string, len(resp.Header))
|
|
for k, vs := range resp.Header {
|
|
// First value only — multi-value headers are uncommon in
|
|
// canvas-DSL HTTP responses, and the param contract specifies
|
|
// a string map.
|
|
if len(vs) > 0 {
|
|
hdr[k] = vs[0]
|
|
}
|
|
}
|
|
|
|
bodyStr := string(bodyBytes)
|
|
|
|
// Clean HTML from response body when clean_html input is set.
|
|
if cleanHTML, _ := inputs["clean_html"].(bool); cleanHTML {
|
|
bodyStr = stripHTMLTags(bodyStr)
|
|
}
|
|
|
|
// Parse body according to the requested datatype.
|
|
datatype, _ := inputs["datatype"].(string)
|
|
if datatype == "" {
|
|
// Infer from Content-Type header.
|
|
ct := resp.Header.Get("Content-Type")
|
|
if strings.Contains(ct, "application/json") {
|
|
datatype = "json"
|
|
} else {
|
|
datatype = "text"
|
|
}
|
|
}
|
|
|
|
return map[string]any{
|
|
"status": resp.StatusCode,
|
|
"body": bodyStr,
|
|
"headers": hdr,
|
|
"datatype": datatype,
|
|
}, nil
|
|
}
|
|
|
|
// Stream is a synchronous facade over Invoke. Real streaming
|
|
// (chunked transfer as it arrives) is a future enhancement.
|
|
func (i *InvokeComponent) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
|
|
out, err := i.Invoke(ctx, inputs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ch := make(chan map[string]any, 1)
|
|
ch <- out
|
|
close(ch)
|
|
return ch, nil
|
|
}
|
|
|
|
// Inputs returns the public parameter surface.
|
|
func (i *InvokeComponent) Inputs() map[string]string {
|
|
return map[string]string{
|
|
"method": "HTTP method: GET, POST, PUT, or DELETE (case-insensitive).",
|
|
"url": "Target URL; can be a {{...}} reference resolved upstream.",
|
|
"headers": "Optional map of string headers.",
|
|
"body": "Optional request body (string).",
|
|
"timeout": "Per-request timeout in seconds; default 30.",
|
|
"proxy": "Optional proxy URL (e.g. http://host:3128).",
|
|
"content_type": "Optional Content-Type; default 'application/json' for POST/PUT.",
|
|
"clean_html": "When true, strip HTML tags from the response body.",
|
|
"datatype": "Expected response datatype: 'json', 'text', or 'html'. Default 'json'.",
|
|
"variables": "Optional template variables for URL/body interpolation.",
|
|
}
|
|
}
|
|
|
|
// Outputs returns the response surface.
|
|
func (i *InvokeComponent) Outputs() map[string]string {
|
|
return map[string]string{
|
|
"status": "HTTP status code (int).",
|
|
"body": "Response body (string, truncated at 16 MiB).",
|
|
"headers": "Response headers (first value per key).",
|
|
"datatype": "Inferred response datatype: 'json' | 'text' | 'html'.",
|
|
}
|
|
}
|
|
|
|
// mustParseProxy parses a proxy URL string. We keep this helper here
|
|
// (rather than calling url.Parse inline) so the panic-on-bad-input
|
|
// behavior is uniform across the package — proxy strings are operator-
|
|
// configured, a malformed one is a deployment error worth crashing
|
|
// loud on.
|
|
func mustParseProxy(raw string) *url.URL {
|
|
u, err := url.Parse(raw)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("Invoke: invalid proxy URL %q: %v", raw, err))
|
|
}
|
|
// Defensive check: net/http.ProxyURL will silently no-op on a
|
|
// URL with no Host. Surface a clear panic instead.
|
|
if u.Host == "" {
|
|
panic(fmt.Sprintf("Invoke: proxy URL %q has no host", raw))
|
|
}
|
|
return u
|
|
}
|
|
|
|
// stripHTMLTags removes HTML tags from the input string. This is a
|
|
// best-effort implementation — it uses a simple regexp to remove
|
|
// everything between < and >. It is NOT a full HTML sanitizer and
|
|
// should only be used for cleaning up response text for consumption
|
|
// by downstream LLM nodes.
|
|
// Mirrors Python's `strip_html_tags` helper (invoke.py).
|
|
func stripHTMLTags(s string) string {
|
|
// Simple regexp-based approach: remove everything between < and >
|
|
re := strings.NewReplacer(
|
|
"<script", "\n<script",
|
|
"</script>", "</script>\n",
|
|
"<style", "\n<style",
|
|
"</style>", "</style>\n",
|
|
)
|
|
s = re.Replace(s)
|
|
for {
|
|
start := strings.Index(s, "<")
|
|
if start == -1 {
|
|
break
|
|
}
|
|
end := strings.Index(s[start:], ">")
|
|
if end == -1 {
|
|
break
|
|
}
|
|
s = s[:start] + s[start+end+1:]
|
|
}
|
|
// Collapse multiple newlines
|
|
for strings.Contains(s, "\n\n\n") {
|
|
s = strings.ReplaceAll(s, "\n\n\n", "\n\n")
|
|
}
|
|
return strings.TrimSpace(s)
|
|
}
|
|
|
|
// netHTTPImports is a no-op reference to keep `net` in the import set
|
|
// for go vet's unused-import check while the production code path
|
|
// doesn't otherwise need the net package (only used by the optional
|
|
// proxy path via http.ProxyURL).
|
|
var _ = net.IPv4len
|
|
|
|
func init() {
|
|
Register(componentNameInvoke, NewInvokeComponent)
|
|
}
|