Files
Zhichang Yu 3fa15c0e2f feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952)
Ports the agent canvas subsystem from Python to Go.

## What's included

### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages

### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |

### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7

### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)

### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs

### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
2026-06-12 22:58:28 +08:00

312 lines
11 KiB
Go

/*
* Copyright 2026 The RAGFlow Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package otel
import (
"context"
"sync"
"time"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/schema"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/embedded"
"go.opentelemetry.io/otel/trace/noop"
)
// TracerName is the instrumentation scope used for every span created by
// [OtelHandler]. The constant is exported so other components can refer to
// the same scope if they need to look up the tracer.
const TracerName = "github.com/infiniflow/ragflow/internal/observability/otel"
// Context-key types. These are unexported (the value is exported via the
// constructor helpers below) so that external packages cannot collide
// with the keys we attach to the callback context.
type (
runIDKeyType struct{}
sessionIDKeyType struct{}
spanContextKeyType struct{}
)
// WithRunID returns a copy of ctx that carries the supplied canvas run
// id. The handler will read this value and attach it as a "run.id"
// attribute on every emitted span. Pass an empty string to clear.
func WithRunID(ctx context.Context, runID string) context.Context {
return context.WithValue(ctx, runIDKeyType{}, runID)
}
// RunIDFromContext returns the run id stored on ctx, or "" if none.
func RunIDFromContext(ctx context.Context) string {
if v, ok := ctx.Value(runIDKeyType{}).(string); ok {
return v
}
return ""
}
// WithSessionID returns a copy of ctx that carries the supplied chat
// session id. The handler will read this value and attach it as a
// "session.id" attribute on every emitted span.
func WithSessionID(ctx context.Context, sessionID string) context.Context {
return context.WithValue(ctx, sessionIDKeyType{}, sessionID)
}
// SessionIDFromContext returns the session id stored on ctx, or "" if
// none.
func SessionIDFromContext(ctx context.Context) string {
if v, ok := ctx.Value(sessionIDKeyType{}).(string); ok {
return v
}
return ""
}
var (
spanContextKey = spanContextKeyType{}
_ = sdktrace.TracerProvider{} // keep sdktrace import meaningful across refactors
)
// spanContextValue bundles the live span with its start time and any
// stream reader that the streaming callbacks need to clean up.
type spanContextValue struct {
span trace.Span
startTime time.Time
// streamIn is the OnStartWithStreamInput copy the framework handed to
// us; we must close it once we have read (or decided to skip) the
// stream so the framework can recycle the original.
streamIn *schema.StreamReader[callbacks.CallbackInput]
// streamOut is the OnEndWithStreamOutput copy; same cleanup contract.
streamOut *schema.StreamReader[callbacks.CallbackOutput]
}
// OtelHandler implements [callbacks.Handler] and bridges every eino
// component invocation to an OTel span.
//
// The handler is safe for concurrent use: it derives the per-call span
// from the provider's [trace.Tracer] (which is itself goroutine-safe) and
// stores the in-flight span on the callback context using an unexported
// key. OnEnd / OnError / streaming variants look up that key to finalise
// the span.
//
// A nil *OtelHandler.tp is treated as a no-op: every method returns the
// received context unchanged and creates no span. This makes it cheap to
// install the handler globally in environments that have not configured
// an OTel collector yet.
type OtelHandler struct {
// tp is the provider that owns the tracer used to mint spans. nil
// means "skip everything" — the handler becomes a transparent pass-
// through.
tp *sdktrace.TracerProvider
// tracer is cached to avoid re-resolving it on every span start.
// It is built lazily from tp so a nil tp does not panic.
tracer trace.Tracer
initOnce sync.Once
}
// NewOtelHandler wraps tp in a callbacks.Handler. A nil tp is accepted;
// the returned handler then behaves as a pass-through that never emits
// spans, so callers can wire it up unconditionally.
func NewOtelHandler(tp *sdktrace.TracerProvider) *OtelHandler {
return &OtelHandler{tp: tp}
}
// resolveTracer returns the cached tracer, falling back to the global
// noop tracer when tp is nil. It is the only place that touches tp, so
// the rest of the methods can assume a non-nil tracer.
func (h *OtelHandler) resolveTracer() trace.Tracer {
h.initOnce.Do(func() {
if h.tp == nil {
h.tracer = noop.NewTracerProvider().Tracer(TracerName)
return
}
h.tracer = h.tp.Tracer(TracerName)
})
return h.tracer
}
// spanName builds the OTel span name for a given RunInfo. The convention
// is "<Component>:<Name>" — component category first, then the business
// name (which is the canvas node id for nodes created via
// compose.WithNodeName). When info is nil or both fields are empty the
// span is named just "component" so it is still visible in the trace UI.
func spanName(info *callbacks.RunInfo) string {
if info == nil {
return "component"
}
component := string(info.Component)
name := info.Name
switch {
case component == "" && name == "":
return "component"
case component == "":
return name
case name == "":
return component
default:
return component + ":" + name
}
}
// runAttributes returns the standard set of attributes that every span
// emitted by the handler carries. The cpn.* / run.id / session.id tuple
// makes the span easy to slice by tenant, canvas or individual run in
// any OTel backend.
func runAttributes(info *callbacks.RunInfo, runID, sessionID string) []attribute.KeyValue {
attrs := []attribute.KeyValue{
attribute.String("run.id", runID),
attribute.String("session.id", sessionID),
}
if info == nil {
return attrs
}
// Canvas DSL loads each node with cpn_id as the node name, so
// info.Name is a reliable cpn.id surrogate. We expose it under a
// dedicated attribute so dashboards can filter by it directly.
if info.Name != "" {
attrs = append(attrs,
attribute.String("cpn.id", info.Name),
attribute.String("cpn.name", info.Name),
)
}
if component := string(info.Component); component != "" {
attrs = append(attrs, attribute.String("cpn.component", component))
}
if info.Type != "" {
attrs = append(attrs, attribute.String("cpn.type", info.Type))
}
return attrs
}
// OnStart is the entry point for a non-streaming component invocation.
// It starts a span, attaches the standard run attributes, and stores the
// live span on the returned context so the matching OnEnd/OnError call
// can finalise it.
func (h *OtelHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, _ callbacks.CallbackInput) context.Context {
if h.tp == nil {
return ctx
}
runID := RunIDFromContext(ctx)
sessionID := SessionIDFromContext(ctx)
startedCtx, span := h.resolveTracer().Start(ctx, spanName(info),
trace.WithTimestamp(time.Now()),
trace.WithAttributes(runAttributes(info, runID, sessionID)...),
)
return context.WithValue(startedCtx, spanContextKey, &spanContextValue{
span: span,
startTime: time.Now(),
})
}
// OnEnd finalises the span started by the matching OnStart. It is a no-op
// if there is no in-flight span on the context (which can happen when
// the handler is installed in a chain alongside another handler that
// consumed the span first — defensive programming for the
// order-independent multi-handler contract).
func (h *OtelHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, _ callbacks.CallbackOutput) context.Context {
v, ok := ctx.Value(spanContextKey).(*spanContextValue)
if !ok || v == nil {
return ctx
}
// Drop the key so the same ctx is not reused for a different span
// should eino (or a future refactor) ever share callbacks across
// goroutines.
v.span.End(trace.WithTimestamp(time.Now()))
return context.WithValue(ctx, spanContextKey, (*spanContextValue)(nil))
}
// OnError records the error on the in-flight span and marks it with
// OTel's "Error" status code. If no span is on the context (e.g. OnStart
// was never called, or the handler is in no-op mode), OnError is a no-op.
func (h *OtelHandler) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
if err == nil {
// Treat nil error as a non-event so we never mark a span Error
// when the framework calls us defensively.
return ctx
}
v, ok := ctx.Value(spanContextKey).(*spanContextValue)
if !ok || v == nil {
return ctx
}
v.span.RecordError(err, trace.WithTimestamp(time.Now()))
v.span.SetStatus(codes.Error, err.Error())
v.span.End(trace.WithTimestamp(time.Now()))
return context.WithValue(ctx, spanContextKey, (*spanContextValue)(nil))
}
// OnStartWithStreamInput mirrors [OtelHandler.OnStart] for streaming
// inputs. eino hands us a *schema.StreamReader that we own a copy of;
// we must close it after we are done so the framework can release the
// original. The handler does not consume the stream — that is the
// downstream component's job — so the close is all the bookkeeping we
// need.
func (h *OtelHandler) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo,
input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
if h.tp == nil {
if input != nil {
input.Close()
}
return ctx
}
runID := RunIDFromContext(ctx)
sessionID := SessionIDFromContext(ctx)
startedCtx, span := h.resolveTracer().Start(ctx, spanName(info),
trace.WithTimestamp(time.Now()),
trace.WithAttributes(runAttributes(info, runID, sessionID)...),
trace.WithSpanKind(trace.SpanKindConsumer),
)
if input != nil {
input.Close()
}
return context.WithValue(startedCtx, spanContextKey, &spanContextValue{
span: span,
startTime: time.Now(),
streamIn: input,
})
}
// OnEndWithStreamOutput mirrors [OtelHandler.OnEnd] for streaming
// outputs. The framework hands us a copy of the output stream; we
// close it without consuming it (the SSE handler in the canvas package
// is the actual consumer) and finalise the span.
func (h *OtelHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo,
output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
v, ok := ctx.Value(spanContextKey).(*spanContextValue)
if !ok || v == nil {
if output != nil {
output.Close()
}
return ctx
}
if output != nil {
output.Close()
v.streamOut = output
}
v.span.End(trace.WithTimestamp(time.Now()))
return context.WithValue(ctx, spanContextKey, (*spanContextValue)(nil))
}
// Compile-time assertion that *OtelHandler satisfies the eino Handler
// interface. Catches signature drift the moment the file is compiled.
var _ callbacks.Handler = (*OtelHandler)(nil)
// Keep the embedded interface referenced so an upgrade that drops
// trace/embedded from go.opentelemetry.io/otel still surfaces a
// compile error here rather than a silent Tracer regression.
var _ embedded.Tracer = (embedded.Tracer)(nil)