Files
ragflow/internal/agent/component/message.go
Zhichang Yu 3f805a64f1 feat(agent): align Go agent behavior with Python (except retrieval component) (#16225)
## Summary

Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.

The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.

## Scope of alignment

### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.

### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.

### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.

### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.

### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.

## Out of scope (intentionally)

- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.

## How alignment is verified

`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle

`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).

`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.

```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```

## Design reference

`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-22 11:58:29 +08:00

470 lines
15 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Package component — Message component (T3).
//
// Message is the canvas terminal output node. It resolves a
// Jinja2-style {{...}} template against the current *CanvasState
// and (optionally) emits the result as a single SSE chunk.
//
// Capabilities:
// - output_format rendering (html / markdown / plain) via render.go
// - auto_play → TTS engine dispatch via internal/agent/audio
// - download extraction from inputs (the {doc_id, filename,
// mime_type} walk from Python's _extract_downloads)
// - memory_save persistence via the registered MemorySaver
// (default stub returns ErrMemoryServiceMissing until a real
// implementation is wired at boot)
package component
import (
"context"
"fmt"
"log"
"maps"
"regexp"
"ragflow/internal/agent/audio"
"ragflow/internal/agent/runtime"
)
const componentNameMessage = "Message"
// MessageComponent is the canvas terminal output node. It owns
// the resolved text template as a per-instance field — the factory
// sets it from the DSL params at build time, and Invoke falls back
// to it when the input map does not carry a fresh "text" override.
//
// Per-instance format / TTS / memory config lets the build-time
// DSL declarations take effect without input-map plumbing.
type MessageComponent struct {
name string
text string
outputFormat OutputFormat
autoPlay audio.Engine
voice string
lang string
}
// NewMessageComponent constructs a Message component. The params map
// may carry:
//
// - "text" (string) — the canonical v2 name
// - "content" (string | []string | []any) — the v1 name
// - "output_format" (string) — "html" | "markdown" | "plain"
// - "auto_play" (bool | string) — TTS engine toggle
// (true → "gtts", string → that engine name)
// - "voice" (string) — TTS voice hint
// - "lang" (string) — TTS language tag
// - "memory_ids" ([]string | []any) — list of memory
// stores to persist into when memory_save=true
//
// At least one of text/content must produce a non-empty string;
// otherwise the node emits an empty content (it is the canvas
// terminal, so a runtime error would be louder than a missing
// template).
func NewMessageComponent(params map[string]any) (Component, error) {
tpl := extractMessageText(params)
format := OutputFormatPlain
if v, ok := params["output_format"].(string); ok {
format = OutputFormat(v)
}
engine, voice, lang := extractAudioConfig(params)
return &MessageComponent{
name: componentNameMessage,
text: tpl,
outputFormat: format,
autoPlay: engine,
voice: voice,
lang: lang,
}, nil
}
// extractAudioConfig reads auto_play / voice / lang from the
// params map. auto_play=true → EngineGTTS; auto_play="edge-tts"
// → EngineEdge; false/missing → EngineEmpty. The string form
// is preferred when the user typed a specific engine name.
func extractAudioConfig(params map[string]any) (audio.Engine, string, string) {
var engine audio.Engine
if v, ok := params["auto_play"]; ok {
switch x := v.(type) {
case bool:
if x {
engine = audio.EngineGTTS
}
case string:
engine = audio.Engine(x)
}
}
voice, _ := params["voice"].(string)
lang, _ := params["lang"].(string)
return engine, voice, lang
}
// extractMessageText reads text / content from params in the v1 / v2
// order documented on NewMessageComponent. Returns the empty string
// when neither key is present or the value is not a string-shaped
// scalar.
func extractMessageText(params map[string]any) string {
if v, ok := params["text"].(string); ok {
return v
}
if v, ok := params["content"]; ok {
switch x := v.(type) {
case string:
return x
case []string:
if len(x) > 0 {
return x[0]
}
case []any:
if len(x) > 0 {
if s, ok := x[0].(string); ok {
return s
}
}
}
}
return ""
}
// Name returns the registered component name.
func (m *MessageComponent) Name() string { return m.name }
// Invoke resolves inputs["text"] (or the per-instance text seeded
// from params at build time) as a template against the current
// *CanvasState, returns the resolved string at outputs["content"], and
// (if inputs["stream"] == true) records the number of chunks in
// outputs["streamed_chunks"].
//
// Message Invoke behaviour:
// - input-format override: inputs["output_format"] wins over the
// per-instance format so an orchestrator can re-render
// downstream
// - downloads: walks inputs for {doc_id, filename, mime_type}
// entries; sets outputs["downloads"] when any are present
// - auto_play: when m.autoPlay is non-empty, dispatches the
// resolved content through the registered audio.Synthesizer
// and surfaces base64 audio under outputs["audio"]
// - memory_save: when true, calls the registered MemorySaver
// with the resolved content (errors are surfaced but not fatal
// so a missing memory service does not break the message)
//
// inputs["text"] takes precedence over the per-instance text so the
// same node can be reused with different templates at run time when
// the orchestrator wants to override the DSL-declared value.
func (m *MessageComponent) Invoke(ctx context.Context, inputs map[string]any) (map[string]any, error) {
state, _, err := runtime.GetStateFromContext[*runtime.CanvasState](ctx)
if err != nil {
return nil, fmt.Errorf("Message: %w", err)
}
if state == nil {
return nil, fmt.Errorf("Message: nil canvas state")
}
text, _ := inputs["text"].(string)
if text == "" {
text = m.text
}
// Message is a display node, not parameter binding. Use the
// tolerant resolver (nil refs render as empty string) instead
// of runtime.ResolveTemplate — matches the Python canvas.py
// soft-fail semantic so authoring patterns like
// {Component@head} for optional fields don't crash the run when
// the upstream list is empty. Parameter-binding call sites keep
// the loud-fail contract via runtime.ResolveTemplate.
resolved := runtime.ResolveTemplateForDisplay(text, state)
// Extract downloads. Walks inputs for download-info maps so
// callers can attach binaries to the message body.
var downloads []DownloadInfo
for _, v := range inputs {
downloads = append(downloads, ExtractDownloads(v)...)
}
// Pick the effective output format. inputs["output_format"]
// overrides the per-instance declaration so the orchestrator can
// re-render downstream.
format := m.outputFormat
if v, ok := inputs["output_format"].(string); ok {
format = OutputFormat(v)
}
rendered := Render(RenderRequest{
Format: format,
Text: resolved,
Downloads: downloads,
})
out := map[string]any{"content": rendered}
if len(downloads) > 0 {
out["downloads"] = downloads
}
// auto_play TTS dispatch. The audio bytes are returned under
// outputs["audio"] as a structured envelope; the SSE layer
// can choose to forward them on a separate event channel.
if m.autoPlay != audio.EngineEmpty {
engine := m.autoPlay
if v, ok := inputs["auto_play"]; ok {
switch x := v.(type) {
case bool:
if x {
engine = audio.EngineGTTS
}
case string:
engine = audio.Engine(x)
}
}
voice := m.voice
if v, ok := inputs["voice"].(string); ok && v != "" {
voice = v
}
lang := m.lang
if v, ok := inputs["lang"].(string); ok && v != "" {
lang = v
}
synth := audio.GetSynthesizer()
resp, ttsErr := synth.Synthesize(ctx, audio.SynthesizeRequest{
Engine: engine,
Text: rendered,
Voice: voice,
Lang: lang,
})
if ttsErr != nil {
// TTS failures are non-fatal — the textual content is
// already in `content`. Surface the error under a
// dedicated key so callers can decide whether to retry.
out["audio_error"] = ttsErr.Error()
} else if resp != nil && len(resp.Audio) > 0 {
out["audio"] = map[string]any{
"media_type": resp.MediaType,
// Base64 is the standard SSE wire shape for
// binary payloads.
"data_b64": resp.Audio,
}
}
}
// Memory persistence. The call is best-effort: a missing
// memory service returns ErrMemoryServiceMissing which we
// surface under outputs["memory_error"] so the message still
// flows.
memSave, _ := inputs["memory_save"].(bool)
if memSave {
memIDs := extractMemoryIDs(inputs)
if len(memIDs) == 0 {
// Fall back to per-instance memory_ids declared in
// the DSL — the orchestrator may not re-pass them
// when it overrides only `memory_save`.
memIDs = extractMemoryIDsFromParams(m.text)
}
if len(memIDs) > 0 {
saver := GetMemorySaver()
saveErr := saver.Save(ctx, MemorySaveRequest{
MemoryIDs: memIDs,
AgentID: state.TaskID,
SessionID: state.RunID,
UserInput: stringFromStateSys(state, "query"),
AgentResponse: rendered,
})
if saveErr != nil {
out["memory_error"] = saveErr.Error()
log.Printf("Message: memory_save failed: %v", saveErr)
}
}
}
if streamOn, _ := inputs["stream"].(bool); streamOn {
// P0: one chunk for the whole resolved content. A later phase
// can split on token / sentence boundaries.
out["streamed_chunks"] = 1
}
return out, nil
}
// extractMemoryIDs normalises a memory_ids value from inputs /
// params. Accepts []string and []any[string].
func extractMemoryIDs(inputs map[string]any) []string {
v, ok := inputs["memory_ids"]
if !ok {
return nil
}
switch x := v.(type) {
case []string:
return x
case []any:
out := make([]string, 0, len(x))
for _, item := range x {
if s, ok := item.(string); ok {
out = append(out, s)
}
}
return out
}
return nil
}
// extractMemoryIDsFromParams looks for a "_memory_ids" hint in
// the component's stored text — used as a last-ditch fallback
// when the orchestrator does not re-pass memory_ids. Returns nil
// in the common case; this helper exists to keep the public
// memory-save flow permissive about caller omissions.
func extractMemoryIDsFromParams(_ string) []string {
return nil
}
// stringFromStateSys reads a sys-level state value. Returns ""
// when state or the key is missing. Used by the memory-save path
// to pull the user's original query.
func stringFromStateSys(state *runtime.CanvasState, key string) string {
if state == nil {
return ""
}
if v, ok := state.Sys[key]; ok {
if s, ok := v.(string); ok {
return s
}
}
return ""
}
// Stream is the SSE variant. The resolved template content is
// split on sentence boundaries ([.!?]\s+ between letters/digits)
// and each sentence is emitted as a separate chunk. A trailing
// "done" marker signals end-of-stream. The chunk map's "content"
// key carries the sentence text; "done" is true on the final
// chunk.
//
// The splitter uses a regex for portable sentence boundaries
// without pulling in a tokenizer. A future tokenizer-aware
// splitter (gonja + langdetect, or a small Go
// sentence-segmentation lib) can improve break quality.
func (m *MessageComponent) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
ch := make(chan map[string]any, 16)
go func() {
defer close(ch)
result, err := m.Invoke(ctx, inputs)
if err != nil {
select {
case ch <- map[string]any{"error": err.Error()}:
case <-ctx.Done():
}
return
}
text, _ := result["content"].(string)
if text == "" {
// Nothing to split; emit a single empty-content chunk
// plus the done marker so downstream consumers have a
// well-defined two-chunk stream.
select {
case ch <- map[string]any{"content": "", "thinking": ""}:
case <-ctx.Done():
return
}
} else {
sentences := splitSentences(text)
for _, s := range sentences {
select {
case ch <- map[string]any{"content": s, "thinking": ""}:
case <-ctx.Done():
return
}
}
}
select {
case ch <- map[string]any{"done": true, "model": result["model"]}:
case <-ctx.Done():
}
}()
return ch, nil
}
// sentenceSplitRe matches sentence boundaries: ".", "!", or "?"
// followed by whitespace. The character class keeps the
// abbreviations list short for v1; the follow-up tokenizer-aware
// splitter is a more robust replacement.
var sentenceSplitRe = regexp.MustCompile(`([.!?])\s+`)
// splitSentences splits text on sentence boundaries, preserving
// the trailing punctuation. Returns a slice of at least one
// element; empty input returns a single empty element.
func splitSentences(text string) []string {
if text == "" {
return []string{""}
}
matches := sentenceSplitRe.FindAllStringIndex(text, -1)
if len(matches) == 0 {
return []string{text}
}
out := make([]string, 0, len(matches)+1)
prev := 0
for _, m := range matches {
// Include the matched punctuation in the previous sentence
// but stop BEFORE the trailing whitespace — otherwise each
// emitted sentence has a leading space, which both the
// Message component's stream joiner and the v1 Python
// chunker would have to re-trim.
out = append(out, text[prev:m[0]+1])
prev = m[1]
}
if prev < len(text) {
out = append(out, text[prev:])
}
return out
}
// Inputs returns the public parameter surface. Field types match
// the Python DSL contract (text template, stream toggle,
// memory_save toggle).
func (m *MessageComponent) Inputs() map[string]string {
return map[string]string{
"text": "Template string with {{...}} references; resolved against the canvas state.",
"stream": "When true, the resolved content is delivered as an SSE stream.",
"memory_save": "When true, persist the message via the registered MemorySaver (default stub returns ErrMemoryServiceMissing).",
"memory_ids": "List of memory-store IDs to persist into (used when memory_save=true).",
"output_format": "'html' | 'markdown' | 'plain'. Default 'plain' when unset.",
"auto_play": "When truthy, dispatch the resolved text through the audio.Synthesizer.",
"voice": "TTS voice hint (engine-specific).",
"lang": "TTS language tag (BCP-47, e.g. 'en' or 'zh-CN').",
}
}
// Outputs returns the resolved template plus the streamed-chunk
// counter.
func (m *MessageComponent) Outputs() map[string]string {
return map[string]string{
"content": "Resolved and rendered message body.",
"streamed_chunks": "Number of SSE chunks emitted (present when stream=true).",
"downloads": "Extracted download descriptors ({doc_id, filename, mime_type, url}).",
"audio": "{media_type, data_b64} envelope populated when auto_play is wired and a TTS engine succeeds.",
"audio_error": "Surfaced when TTS dispatch fails; the textual content is still returned.",
"memory_error": "Surfaced when memory persistence fails; the textual content is still returned.",
}
}
// mapCopy shallow-copies src into a fresh map. Used to keep Message's
// passthrough outputs un-aliased from the caller's inputs map.
func mapCopy(src map[string]any) map[string]any {
out := make(map[string]any, len(src))
maps.Copy(out, src)
return out
}
func init() {
Register(componentNameMessage, NewMessageComponent)
}