Files
ragflow/internal/service/agent.go
Zhichang Yu 2362210caf refactor(log): unify Go logging to zap with rotation, strip per-package levels (#16261)
Refactor the Go agent port's logging so every log line — gin access,
agent canvas events, harness warnings, fatal boot errors — flows through
a single common.Logger (zap) backed by a rotated file, with structured
fields, level filtering, and configurable rotation.

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-23 16:21:46 +08:00

1109 lines
41 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/cloudwego/eino/compose"
"github.com/google/uuid"
"go.uber.org/zap"
"gorm.io/gorm"
"ragflow/internal/agent/canvas"
"ragflow/internal/agent/runtime"
agentsandbox "ragflow/internal/agent/sandbox"
agenttool "ragflow/internal/agent/tool"
"ragflow/internal/common"
"ragflow/internal/dao"
"ragflow/internal/entity"
dslpkg "ragflow/internal/agent/dsl"
)
// genID32 returns a 32-char UUID-derived primary key suitable for the
// user_canvas and user_canvas_version tables. The format matches Python
// uuid.uuid4().hex used by the original DAO and keeps existing rows
// joinable across Python and Go writers.
func genID32() string {
return strings.ReplaceAll(uuid.New().String(), "-", "")[:32]
}
// ErrAgentNotOwner is returned by DeleteAgent when the canvas exists and
// is accessible to the caller but is owned by a different user. It maps
// to the Python "Only the owner of the agent is authorized for this
// operation." message via handler.mapAgentError.
//
// The Python agent API keeps access-check and owner-check as two
// separate decorators (api/apps/restful_apis/agent_api.py:74-100);
// we mirror that distinction with ErrUserCanvasNotFound (access) and
// ErrAgentNotOwner (owner).
var ErrAgentNotOwner = errors.New("agent not owned by user")
// ErrAgentStorageError is returned by RunAgent when the underlying
// version / canvas / tenant DAO surfaces a non-sentinel error (DB
// connectivity, schema drift, deadlock, etc.). The handler's
// mapAgentError recognises this sentinel and maps it to
// common.CodeServerError (500) with a SANITIZED message — the raw
// DAO error string is never echoed to the client, so internal
// connection-string / table-name leaks are avoided.
//
// v3.5.2 follow-up: the prior af2ac2eda commit claimed "DB error ->
// 500" in the branch table, but the handler's mapAgentError did not
// actually classify those errors as CodeServerError — every DAO
// failure fell through to CodeDataError with the raw err.Error()
// string. This sentinel closes that gap.
var ErrAgentStorageError = errors.New("agent storage error")
// AgentService agent service
type AgentService struct {
canvasDAO *dao.UserCanvasDAO
canvasTemplateDAO *dao.CanvasTemplateDAO
userTenantDAO *dao.UserTenantDAO
versionDAO *dao.UserCanvasVersionDAO
api4ConversationDAO *dao.API4ConversationDAO
// driver is the per-process runner that drives canvas
// invocations and produces SSE events. V1 persistence is
// in-memory; a follow-up phase moves to Redis per plan §4.9.
runner *canvas.Runner
// Phase 4.4 V2 — Redis-backed run infrastructure. nil = in-memory
// / no-tracking (test path, current production boot path until
// cmd/server_main.go wires them in v3.6.0).
//
// checkpointStore + stateSerializer feed canvas.WithCheckPointStore
// / canvas.WithStateSerializer so every Compile's check-point
// payload and CanvasState snapshot round-trip to Redis.
checkpointStore canvas.CheckPointStore
stateSerializer canvas.StateSerializer
// runTracker records per-run lifecycle (Start / MarkSucceeded /
// MarkFailed / MarkCancelled) to Redis hash "agent:run:{id}".
runTracker *canvas.RunTracker
// runMu and runStreams coordinate active canvas run goroutines so that
// CancelAgent can signal a specific canvas. The map is keyed by canvas
// ID; values are channels that close to signal cancellation.
runMu sync.Mutex
runStreams map[string]chan struct{}
}
// NewAgentService create agent service
func NewAgentService() *AgentService {
return NewAgentServiceWithOptions(nil, nil, nil)
}
// NewAgentServiceWithOptions is the production constructor that
// injects the Redis-backed run infrastructure. The zero-arg
// NewAgentService() remains as a thin wrapper that calls this with
// all-nil options so existing call sites (cmd/server_main.go,
// handler tests, agent_test.go) keep compiling.
//
// Phase 4.4 V2: production boot wiring is deferred to v3.6.0; until
// then, tests can construct AgentService instances with mocked
// stores/tracker to exercise the real Compile/Invoke path without
// requiring Redis.
func NewAgentServiceWithOptions(
cp canvas.CheckPointStore,
ser canvas.StateSerializer,
rt *canvas.RunTracker,
) *AgentService {
if stub, ok := agenttool.GetSandboxClient().(interface{ IsStubSandboxClient() bool }); ok && stub.IsStubSandboxClient() {
agenttool.SetSandboxClient(agentsandbox.NewManagerClient())
}
return &AgentService{
canvasDAO: dao.NewUserCanvasDAO(),
canvasTemplateDAO: dao.NewCanvasTemplateDAO(),
userTenantDAO: dao.NewUserTenantDAO(),
versionDAO: dao.NewUserCanvasVersionDAO(),
api4ConversationDAO: dao.NewAPI4ConversationDAO(),
runner: canvas.NewRunner(),
runStreams: make(map[string]chan struct{}),
checkpointStore: cp,
stateSerializer: ser,
runTracker: rt,
}
}
// ListTemplates returns every canvas template. Mirrors Python
// agent_api.list_agent_template, which iterates CanvasTemplateService.get_all()
// and serialises each row.
func (s *AgentService) ListTemplates() ([]*entity.CanvasTemplate, error) {
return s.canvasTemplateDAO.GetAll()
}
// AgentItem is one entry in the list response.
type AgentItem struct {
ID string `json:"id"`
Avatar *string `json:"avatar,omitempty"`
Title *string `json:"title,omitempty"`
Permission string `json:"permission"`
CanvasType *string `json:"canvas_type,omitempty"`
CanvasCategory string `json:"canvas_category"`
CreateTime *int64 `json:"create_time,omitempty"`
UpdateTime *int64 `json:"update_time,omitempty"`
}
// ListAgentsResponse is the response body for GET /api/v1/agents.
type ListAgentsResponse struct {
Canvas []*AgentItem `json:"canvas"`
Total int64 `json:"total"`
}
func toAgentItem(c *entity.UserCanvas) *AgentItem {
return &AgentItem{
ID: c.ID,
Avatar: c.Avatar,
Title: c.Title,
Permission: c.Permission,
CanvasType: c.CanvasType,
CanvasCategory: c.CanvasCategory,
CreateTime: c.CreateTime,
UpdateTime: c.UpdateTime,
}
}
// ListAgents returns agent canvases visible to userID.
// Mirrors Python agent_api.list_agents — validates owner_ids against joined tenants,
// then delegates to the DAO.
func (s *AgentService) ListAgents(userID string, keywords string, page, pageSize int, orderby string, desc bool, ownerIDs []string, canvasCategory string) (*ListAgentsResponse, common.ErrorCode, error) {
// Build the set of tenant IDs the user is authorised to query.
tenantIDs, err := s.userTenantDAO.GetTenantIDsByUserID(userID)
if err != nil {
return nil, common.CodeServerError, fmt.Errorf("failed to get tenant IDs: %w", err)
}
authorised := make(map[string]struct{}, len(tenantIDs)+1)
for _, id := range tenantIDs {
authorised[id] = struct{}{}
}
authorised[userID] = struct{}{}
var effectiveOwnerIDs []string
if len(ownerIDs) > 0 {
for _, id := range ownerIDs {
if _, ok := authorised[id]; !ok {
return nil, common.CodeOperatingError, fmt.Errorf("only authorized owner_ids can be queried")
}
}
effectiveOwnerIDs = ownerIDs
} else {
effectiveOwnerIDs = make([]string, 0, len(authorised))
for id := range authorised {
effectiveOwnerIDs = append(effectiveOwnerIDs, id)
}
}
canvases, total, err := s.canvasDAO.ListByTenantIDs(
effectiveOwnerIDs,
userID,
page,
pageSize,
orderby,
desc,
keywords,
canvasCategory,
)
if err != nil {
return nil, common.CodeServerError, fmt.Errorf("failed to list agents: %w", err)
}
items := make([]*AgentItem, len(canvases))
for i, c := range canvases {
items[i] = toAgentItem(c)
}
return &ListAgentsResponse{Canvas: items, Total: total}, common.CodeSuccess, nil
}
// CreateAgentRequest is the input shape for CreateAgent.
type CreateAgentRequest struct {
UserID string `json:"user_id"`
Title *string `json:"title,omitempty"`
Description *string `json:"description,omitempty"`
Permission string `json:"permission"`
CanvasType *string `json:"canvas_type,omitempty"`
CanvasCategory string `json:"canvas_category"`
DSL entity.JSONMap `json:"dsl,omitempty"`
}
// CreateAgent inserts a new user_canvas row. ID is assigned here.
//
// Returns the standard (T, common.ErrorCode, error) triple so the handler
// can map validation/duplicate errors to codes 101/102 without
// introducing a separate error type. Missing DSL or title and a
// duplicate title under the same owner all surface as specific code
// values that the Python agent API contract expects.
func (s *AgentService) CreateAgent(ctx context.Context, req *CreateAgentRequest) (*entity.UserCanvas, common.ErrorCode, error) {
if req == nil {
return nil, common.CodeArgumentError, errors.New("create agent: nil request")
}
if req.DSL == nil {
return nil, common.CodeArgumentError, errors.New("No DSL data in request.")
}
if req.Title == nil || strings.TrimSpace(*req.Title) == "" {
return nil, common.CodeArgumentError, errors.New("No title in request.")
}
title := strings.TrimSpace(*req.Title)
req.Title = &title
if existing, err := s.canvasDAO.GetByUserAndTitle(req.UserID, title, req.CanvasCategory); err != nil {
return nil, common.CodeServerError, fmt.Errorf("check duplicate title: %w", err)
} else if existing != nil {
return nil, common.CodeDataError, errors.New(title + " already exists.")
}
if req.Permission == "" {
req.Permission = "me"
}
if req.CanvasCategory == "" {
req.CanvasCategory = "agent_canvas"
}
// Normalize legacy v1 / Go-v2 payloads to a React-Flow-shaped graph so
// the front-end can render the canvas without a migration. Idempotent;
// no-op when graph.nodes is already non-empty.
req.DSL = dslpkg.NormalizeForCanvas(req.DSL)
row := &entity.UserCanvas{
ID: genID32(),
UserID: req.UserID,
Title: req.Title,
Description: req.Description,
Permission: req.Permission,
CanvasType: req.CanvasType,
CanvasCategory: req.CanvasCategory,
DSL: req.DSL,
}
if err := s.canvasDAO.Create(row); err != nil {
return nil, common.CodeServerError, fmt.Errorf("create agent: %w", err)
}
return row, common.CodeSuccess, nil
}
// loadCanvasForUser is the shared IDOR guard used by every non-List
// canvas method. It resolves the caller's tenant set, then asks the DAO
// to load the canvas subject to the (owner OR team-in-tenant) predicate.
// On miss or access-deny it returns dao.ErrUserCanvasNotFound so the
// handler layer can map every "not yours" case to the same 404 envelope
// — see plan §4.8 IDOR mitigation.
//
// DAO-error sanitisation (v3.5.2 follow-up): the raw userTenantDAO and
// canvasDAO errors are wrapped with ErrAgentStorageError so mapAgentError
// classifies them as CodeServerError (500) with a sanitized message —
// the original DAO error string (DSN, table name, gorm stack frame)
// MUST NOT reach the client. Sentinels (ErrUserCanvasNotFound) pass
// through unchanged so they keep mapping to the 404 envelope.
//
// This is the FIRST storage access path RunAgent hits, so leaving the
// raw errors here would have left a DAO-string leak in the very first
// hop — the earlier af2ac2eda + 804854a5e commits only sanitised the
// version-read path, missing the canvas-access path.
func (s *AgentService) loadCanvasForUser(ctx context.Context, userID, canvasID string) (*entity.UserCanvas, error) {
if canvasID == "" {
return nil, dao.ErrUserCanvasNotFound
}
if userID == "" {
return nil, dao.ErrUserCanvasNotFound
}
tenants, err := s.userTenantDAO.GetTenantIDsByUserID(userID)
if err != nil {
if errors.Is(err, dao.ErrUserCanvasNotFound) {
return nil, err
}
return nil, fmt.Errorf("tenants for user %s: %w: %w", userID, err, ErrAgentStorageError)
}
row, err := s.canvasDAO.GetByIDForUser(canvasID, userID, tenants)
if err != nil {
if errors.Is(err, dao.ErrUserCanvasNotFound) {
return nil, err
}
return nil, fmt.Errorf("load canvas %q for user %s: %w: %w", canvasID, userID, err, ErrAgentStorageError)
}
return row, nil
}
// GetAgent returns a single canvas visible to the requesting user.
// Returns dao.ErrUserCanvasNotFound (not 403) when the canvas is missing
// or belongs to another user.
func (s *AgentService) GetAgent(ctx context.Context, userID, canvasID string) (*entity.UserCanvas, error) {
return s.loadCanvasForUser(ctx, userID, canvasID)
}
// UpdateAgent writes a new DSL to the draft (user_canvas.dsl) and toggles
// release=false. The call does NOT create a new user_canvas_version row —
// versions are produced only by PublishAgent.
func (s *AgentService) UpdateAgent(ctx context.Context, userID, canvasID string, dsl entity.JSONMap) error {
row, err := s.loadCanvasForUser(ctx, userID, canvasID)
if err != nil {
return err
}
row.DSL = dslpkg.NormalizeForCanvas(dsl)
row.Release = false
if err := s.canvasDAO.Update(row); err != nil {
return fmt.Errorf("update agent %s: %w", canvasID, err)
}
return nil
}
// ResetAgent clears the per-run state of a canvas (history, retrieval,
// memory, path) and zeroes every "sys.*" / "env.*" global, mirroring
// the Python handler at api/apps/restful_apis/agent_api.py:992. The
// reset transform is a pure DSL mutation; the persisted row in
// user_canvas.dsl is rewritten in place and the freshly reset DSL is
// returned so the caller can render it back to the client without an
// extra GET.
//
// Reset does NOT create a new user_canvas_version row — that mirrors
// the Python behaviour and UpdateAgent: versions are owned by
// PublishAgent. It also does NOT touch the in-flight run state of any
// currently executing canvas session; that is owned by the Python task
// executor and is out of scope for the Go port.
//
// Errors propagate the same way as GetAgent: a missing canvas, or a
// canvas that the user has no access to, surfaces as
// dao.ErrUserCanvasNotFound so mapAgentError emits the same 404 the
// Python handler does for "canvas not found.".
func (s *AgentService) ResetAgent(ctx context.Context, userID, canvasID string) (entity.JSONMap, error) {
row, err := s.loadCanvasForUser(ctx, userID, canvasID)
if err != nil {
return nil, err
}
reset := dslpkg.ResetForCanvas(map[string]any(row.DSL))
// Re-normalize through the same entry point UpdateAgent uses so
// any front-end that reads `graph.nodes` / `components[*].obj`
// right after the response sees a renderable shape, not a partial
// reset that left the legacy short-form DSL intact.
row.DSL = dslpkg.NormalizeForCanvas(reset)
row.Release = false
if err := s.canvasDAO.Update(row); err != nil {
return nil, fmt.Errorf("reset agent %s: %w", canvasID, err)
}
return row.DSL, nil
}
// DeleteAgent removes the canvas and cascades to its user_canvas_version
// rows in a single transaction so a mid-flight failure cannot leave
// orphan version rows (Phase 5 §2.9; review follow-up M2).
//
// Owner-only by design (mirrors _require_canvas_owner_sync in the Python
// agent API). Both "canvas does not exist" and "canvas is owned by
// someone else" surface as ErrAgentNotOwner so the handler emits the
// single "Only the owner..." 103 message — same envelope as the Python
// decorator (api/apps/restful_apis/agent_api.py:94-100), which uses
// UserCanvasService.query(user_id=..., id=...) and conflates those two
// cases into one OPERATING_ERROR response.
func (s *AgentService) DeleteAgent(ctx context.Context, userID, canvasID string) error {
row, err := s.canvasDAO.GetByID(canvasID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return ErrAgentNotOwner
}
return fmt.Errorf("load agent %s: %w", canvasID, err)
}
if row.UserID != userID {
return ErrAgentNotOwner
}
return dao.DB.Transaction(func(tx *gorm.DB) error {
if _, err := s.versionDAO.DeleteByCanvasIDTx(tx, canvasID); err != nil {
return fmt.Errorf("delete agent: cascade versions: %w", err)
}
if err := s.canvasDAO.DeleteTx(tx, canvasID); err != nil {
return fmt.Errorf("delete agent %s: %w", canvasID, err)
}
return nil
})
}
// PublishAgentRequest is the input shape for PublishAgent.
type PublishAgentRequest struct {
Title *string `json:"title,omitempty"`
Description *string `json:"description,omitempty"`
DSL entity.JSONMap `json:"dsl,omitempty"`
}
// PublishAgent appends a new user_canvas_version row and marks the parent
// canvas as released in a single transaction. Existing versions are never
// overwritten (§2.9); the parent canvas DSL/title/description/release
// fields are updated atomically with the new version row.
func (s *AgentService) PublishAgent(ctx context.Context, userID, canvasID string, req *PublishAgentRequest) (*entity.UserCanvasVersion, error) {
canvas, err := s.loadCanvasForUser(ctx, userID, canvasID)
if err != nil {
return nil, err
}
dsl := canvas.DSL
title := canvas.Title
description := canvas.Description
if req != nil {
if req.DSL != nil {
dsl = dslpkg.NormalizeForCanvas(req.DSL)
}
if req.Title != nil {
title = req.Title
}
if req.Description != nil {
description = req.Description
}
}
row := &entity.UserCanvasVersion{
ID: genID32(),
UserCanvasID: canvasID,
Title: title,
Description: description,
DSL: dsl,
}
if err := dao.DB.Transaction(func(tx *gorm.DB) error {
if err := s.versionDAO.CreateTx(tx, row); err != nil {
return fmt.Errorf("publish agent %s: insert version: %w", canvasID, err)
}
canvas.DSL = dsl
canvas.Title = title
canvas.Description = description
canvas.Release = true
if err := s.canvasDAO.UpdateTx(tx, canvas); err != nil {
return fmt.Errorf("publish agent %s: update parent: %w", canvasID, err)
}
return nil
}); err != nil {
return nil, err
}
return row, nil
}
// ListVersions returns every version for a canvas the user can see,
// newest first. The parent-canvas access check is enforced before the
// version list is loaded so unauthorized users cannot enumerate version
// ids of canvases they cannot read.
func (s *AgentService) ListVersions(ctx context.Context, userID, canvasID string) ([]*entity.UserCanvasVersion, error) {
if _, err := s.loadCanvasForUser(ctx, userID, canvasID); err != nil {
return nil, err
}
return s.versionDAO.ListByCanvasID(canvasID)
}
// GetVersion returns a single version of a canvas the user can see.
// Returns dao.ErrUserCanvasVersionNotFound when the version does not
// exist or belongs to a different canvas, and
// dao.ErrUserCanvasNotFound when the parent canvas is not visible to
// the requesting user.
func (s *AgentService) GetVersion(ctx context.Context, userID, canvasID, versionID string) (*entity.UserCanvasVersion, error) {
if versionID == "" {
return nil, dao.ErrUserCanvasVersionNotFound
}
if _, err := s.loadCanvasForUser(ctx, userID, canvasID); err != nil {
return nil, err
}
row, err := s.versionDAO.GetByID(versionID)
if err != nil {
return nil, err
}
if row.UserCanvasID != canvasID {
return nil, dao.ErrUserCanvasVersionNotFound
}
return row, nil
}
// DeleteVersion removes a single version of a canvas the user can see.
// Returns dao.ErrUserCanvasVersionNotFound when the row does not exist
// (or belongs to a different canvas) and dao.ErrUserCanvasNotFound when
// the parent canvas is not visible to the requesting user.
func (s *AgentService) DeleteVersion(ctx context.Context, userID, canvasID, versionID string) error {
if versionID == "" {
return dao.ErrUserCanvasVersionNotFound
}
if _, err := s.loadCanvasForUser(ctx, userID, canvasID); err != nil {
return err
}
row, err := s.versionDAO.GetByID(versionID)
if err != nil {
return err
}
if row.UserCanvasID != canvasID {
return dao.ErrUserCanvasVersionNotFound
}
return dao.DB.Transaction(func(tx *gorm.DB) error {
return s.versionDAO.DeleteTx(tx, versionID)
})
}
// RunAgent starts a run for the given canvas and returns a channel of
// orchestrator events the HTTP layer streams back as SSE. The driver owns
// the wait-for-user cycle (eino interrupt, gap-analysis §11.6.4): the
// RunFunc returns an interrupt error when a UserFillUp node pauses the
// graph, the driver persists the interrupt id keyed by (canvasID,
// sessionID), and resumes when the next call supplies a non-empty
// userInput — at which point it injects (__resume_interrupt_id__,
// __resume_data__) into root so the RunFunc can call
// compose.ResumeWithData(ctx, id, data) before invoking the workflow.
//
// sessionID is required for the multi-turn cycle: the handler generates
// one (UUID) on the first call and reuses it on follow-up posts. version
// selects the published UserCanvasVersion row; "" uses the latest version.
//
// The per-run RunFunc is built by buildRunFunc — see its doc comment
// for the full production chain (real Compile/Invoke, resume path,
// error-layering contract).
func (s *AgentService) RunAgent(ctx context.Context, userID, canvasID, sessionID, version, userInput string) (<-chan canvas.RunEvent, error) {
canvasRow, err := s.loadCanvasForUser(ctx, userID, canvasID)
if err != nil {
return nil, err
}
if sessionID == "" {
sessionID = strings.ReplaceAll(uuid.New().String(), "-", "")
}
// Load the version row up front so the run is bound to a real DSL.
//
// IDOR guard (v3.5.2 review): when the caller supplies an explicit
// version id, the row we load must belong to canvasID — otherwise
// any caller who knows a foreign version id could run that
// canvas's DSL against their own canvas (a clear
// integrity/authorization boundary breach). GetVersion (at the
// read path) already enforces this check; the run path did not.
// We mirror that check here and surface ErrUserCanvasVersionNotFound
// so the handler maps to a clean 404 rather than silently using
// the foreign DSL.
//
// DAO-error visibility (v3.5.2 review): the previous code did
// `versionRow, _ = ...` for both GetByID and GetLatest, which
// masked every database failure as "no version published" and
// let real ops issues hide behind the V1 placeholder answer.
// We now distinguish three cases:
// - explicit version, row not found → 404
// - explicit version, row from other canvas → 404 (IDOR)
// - explicit version, DB error → 500 (surface it)
// - latest path, no rows + no error → fall back to canvasRow.DSL (matches Python `completion()`)
// - latest path, DB error → 500 (surface it)
//
// v3.6 follow-up: when no published version exists, fall back to
// the canvas's current editable DSL (canvasRow.DSL) instead of
// the "no published version" placeholder. The Python reference at
// api/db/services/canvas_service.py:332 does the same via
// UserCanvasService.get_agent_dsl_with_release(agent_id,
// release_mode=False, tenant_id=...) when release_mode is unset
// on the request — the front-end's auto-save-on-run path means
// the editable DSL is what the user just clicked "Run" against.
// The buildRunFunc placeholder branch is reserved for the rare
// "canvas exists but has no DSL at all" edge case.
var (
versionRow *entity.UserCanvasVersion
dsl map[string]any
)
if version != "" {
row, err := s.versionDAO.GetByID(version)
if err != nil {
if errors.Is(err, dao.ErrUserCanvasVersionNotFound) {
return nil, fmt.Errorf("RunAgent: load version %q: %w", version, err)
}
// Wrap DB-side errors with ErrAgentStorageError so
// the handler maps them to CodeServerError (500)
// with a sanitized message — raw DAO strings
// (DSNs, table names, gorm stack frames) MUST NOT
// reach the client.
return nil, fmt.Errorf("RunAgent: load version %q: %w: %w", version, err, ErrAgentStorageError)
}
if row.UserCanvasID != canvasID {
// IDOR — caller asked to run version X against canvas
// Y, but version X belongs to canvas Z. Surface the
// same "not found" envelope the read path uses.
return nil, fmt.Errorf(
"RunAgent: version %q belongs to canvas %q, not %q: %w",
version, row.UserCanvasID, canvasID,
dao.ErrUserCanvasVersionNotFound,
)
}
versionRow = row
}
if versionRow == nil {
row, lerr := s.versionDAO.GetLatest(canvasID)
switch {
case lerr == nil:
versionRow = row
case errors.Is(lerr, dao.ErrUserCanvasVersionNotFound):
// No published version — fall back to the canvas's
// current editable DSL (see v3.6 follow-up comment
// above). Mirrors Python's
// `get_agent_dsl_with_release(...release_mode=False)`
// fallback in completion().
if len(canvasRow.DSL) > 0 {
dsl = dslpkg.NormalizeForRun(map[string]any(canvasRow.DSL))
}
default:
// Wrap DB-side errors with ErrAgentStorageError
// for the same reason as above (no DAO-string
// leak to the client).
return nil, fmt.Errorf("RunAgent: load latest version for canvas %q: %w: %w", canvasID, lerr, ErrAgentStorageError)
}
}
if dsl == nil {
dsl = normalisedDSLForRun(versionRow)
}
run := s.buildRunFunc(canvasID, versionRow, dsl)
root := map[string]any{
"canvas_id": canvasID,
"version_id": version,
"session_id": sessionID,
"user_id": userID,
}
if userInput != "" {
root["user_input"] = userInput
}
if dsl != nil {
root["__dsl_present__"] = true
}
// Phase 4.4 V2.1 (v3.6.1): populate root["tenant_id"] so the
// RunTracker.Start call (in buildRunFunc) records the run
// under the right tenant. The lookup is best-effort — a
// failure here (DAO down, user has no tenants) logs and
// continues with an empty tenant_id rather than failing
// the run; the run still works, the only loss is the
// per-tenant filterability of the run-history log.
if tenantIDs, terr := s.userTenantDAO.GetTenantIDsByUserID(userID); terr == nil && len(tenantIDs) > 0 {
root["tenant_id"] = tenantIDs[0]
} else if terr != nil {
common.Warn("service: RunAgent userTenantDAO.GetTenantIDsByUserID (best-effort, run not blocked)",
zap.String("user_id", userID),
zap.Error(terr))
}
// v3.6.1 diagnostic: log what RunAgent put into root so we can
// confirm tenant_id / user_id / session_id / user_input all
// reached the buildRunFunc closure (which runs in the runner's
// goroutine, possibly after a context switch).
common.Debug("RunAgent root",
zap.String("canvasID", canvasID),
zap.String("userID", userID),
zap.String("sessionID", sessionID),
zap.Any("tenantID", root["tenant_id"]),
zap.Int("userInput_len", func() int { s, _ := root["user_input"].(string); return len(s) }()))
return s.runner.Run(ctx, run, canvasID, sessionID, userInput, root), nil
}
// buildRunFunc assembles the per-run RunFunc the orchestrator (canvas.Runner)
// drives.
//
// Phase 4.4 V2: this is the real Compile/Invoke path. The previous
// V1 echo placeholder returned a synthesised answer without ever
// calling canvas.Compile — every RunAgent invocation pretended to
// run the canvas. The V2 body actually compiles the DSL, attaches
// the CanvasState to ctx via runtime.WithState (so component bodies
// can read it via runtime.GetStateFromContext), invokes the
// workflow, and surfaces real output through the Runner's existing
// answer-extraction contract.
//
// Nil-versionRow guard: the RunAgent call site treats "no version
// published" as a legal state and passes nil. We extract taskID
// safely and, when both versionRow and dsl are empty, fall back to
// a graceful "no published version" placeholder so the SSE surface
// still flows (TestRunAgent_NoVersionPublishedPlaceholder pins this
// behaviour). The placeholder is written into state.Outputs under
// (cpn="answer", bucket="answer") so the answer extraction in
// first-pass lookup picks it up; the same trick the V1 placeholder
// used (the v3.5.2 fix landed this and we keep it).
//
// Resume path: Runner.Run injects (__resume_interrupt_id__,
// __resume_data__) into root when userInput arrives on a session
// that previously paused at a wait-for-user interrupt. We consume
// them here and decorate ctx with compose.ResumeWithData so the
// targeted UserFillUp node (compile.go:53-55 lists them via
// compose.WithInterruptBeforeNodes) resumes and reads the user's
// follow-up via compose.GetResumeContext.
func (s *AgentService) buildRunFunc(canvasID string, versionRow *entity.UserCanvasVersion, dsl map[string]any) canvas.RunFunc {
return func(ctx context.Context, root map[string]any) (*canvas.CanvasState, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
// Extract the event channel + metadata injected by Runner.Run.
events, _ := root["__events__"].(chan canvas.RunEvent)
messageID, _ := root["__message_id__"].(string)
taskID, _ := root["__task_id__"].(string)
sessionID, _ := root["__session_id__"].(string)
// Helper to build an SSE event with metadata.
emit := func(typ, data string) {
if events == nil {
return
}
canvas.PushEvent(events, canvas.RunEvent{
Type: typ, Data: data,
MessageID: messageID,
CreatedAt: time.Now().Unix(),
TaskID: taskID,
SessionID: sessionID,
})
}
startedAt := float64(time.Now().UnixNano()) / 1e9
userInput := ""
if v, ok := root["user_input"].(string); ok {
userInput = v
}
resumeID, isResume := root["__resume_interrupt_id__"].(string)
if !isResume || resumeID == "" {
wsData, _ := json.Marshal(map[string]any{"inputs": userInput})
emit("workflow_started", string(wsData))
}
runID := runIDFor(canvasID, root)
state := canvas.NewCanvasState(runID, taskID)
// Graceful placeholder: no version published AND no DSL.
if versionRow == nil && len(dsl) == 0 {
answer := fmt.Sprintf("No published version found for canvas %q — publish a version before running.", canvasID)
state.RecordOutput("answer", "answer", answer)
// Emit a message event so the SSE surface matches the
// normal-completion shape (test asserts message +
// workflow_finished + done for the placeholder path).
msgData, _ := json.Marshal(canvas.MessageEvent{Content: answer})
meData, _ := json.Marshal(canvas.MessageEndEvent{})
emit("message", string(msgData))
emit("message_end", string(meData))
wfData, _ := json.Marshal(map[string]any{"outputs": answer})
emit("workflow_finished", string(wfData))
return state, nil
}
// DSL → *Canvas.
c, err := decodeCanvasFromDSL(dsl)
if err != nil {
s.markRunFailed(ctx, runID, "decode: "+err.Error())
return nil, err
}
// Store events channel + run metadata on the context so the
// per-node statePre/statePost wrappers (in scheduler.go) can
// emit node_started / node_finished events at the correct
// per-node lifecycle points. Context is used (rather than
// state.Sys) because eino's WithGenLocalState creates a fresh
// CanvasState per run — only the context thread survives from
// the service layer into the state handlers.
ctx2 := canvas.WithRunMeta(ctx, &canvas.RunMeta{
Events: events,
MessageID: messageID,
TaskID: taskID,
SessionID: sessionID,
})
// Seed initial env/sys values from the Canvas DSL globals.
// Python's self.globals dict stores "sys.*" and "env.*" under
// their full dotted keys; the Go port splits these into Sys /
// Env / Globals maps so GetVar("env.counter") can look up
// Env["counter"] directly. Without seeding, Env starts empty
// and every env.* reference resolves to nil (unresolved ref).
if c.Globals != nil {
for k, v := range c.Globals {
if strings.HasPrefix(k, "sys.") {
state.Sys[strings.TrimPrefix(k, "sys.")] = v
} else if strings.HasPrefix(k, "env.") {
state.Env[strings.TrimPrefix(k, "env.")] = v
} else {
state.Globals[k] = v
}
}
}
state.Sys["query"] = userInput
if tid, ok := root["tenant_id"].(string); ok && tid != "" {
state.Sys["tenant_id"] = tid
}
ctx2 = runtime.WithState(ctx2, state)
// Resume path. The user input is the resume payload for the
// previously-paused UserFillUp node — it should NOT also be
// presented to UserFillUp:Menu (the first interactive node)
// as a fresh "menu selection". Without this distinction, on
// the follow-up RunAgent call sys.query=resume_payload would
// be consumed by initialUserFillUpData in the menu body, the
// menu would pick up the resume text as a brand-new branch
// choice, Switch:Route would route to that branch, and the
// previously-paused branch would be silently dropped (the
// "second input doesn't resume" symptom). Clear sys.query so
// the menu's initial-input fast path returns false and the
// body falls through to compose.Interrupt — the menu pauses
// for fresh input next time the user actually wants a
// different branch.
if isResume && resumeID != "" {
resumeData := root["__resume_data__"]
delete(root, "__resume_interrupt_id__")
delete(root, "__resume_data__")
state.Sys["query"] = ""
ctx2 = compose.ResumeWithData(ctx2, resumeID, resumeData)
}
if s.runTracker != nil {
_ = s.runTracker.Start(ctx2, runID, canvasID,
tenantIDFromRoot(root), userInput)
}
// Compile.
var cc *canvas.CompiledCanvas
switch {
case s.checkpointStore != nil && s.stateSerializer != nil:
cc, err = canvas.Compile(ctx2, c,
canvas.WithCheckPointStore(s.checkpointStore),
canvas.WithStateSerializer(s.stateSerializer),
)
case s.checkpointStore != nil:
cc, err = canvas.Compile(ctx2, c,
canvas.WithCheckPointStore(s.checkpointStore),
)
default:
cc, err = canvas.Compile(ctx2, c)
}
if err != nil {
common.Debug("RunAgent compile err",
zap.String("canvas", canvasID),
zap.String("session", sessionID),
zap.String("task", taskID),
zap.String("run", runID),
zap.String("type", fmt.Sprintf("%T", err)),
zap.Error(err))
s.markRunFailed(ctx2, runID, "compile: "+err.Error())
return nil, fmt.Errorf("canvas compile: %w: %w", ErrAgentStorageError, err)
}
cpID := ""
if s.checkpointStore != nil {
cpID = runID
}
// Invoke.
var invokeOpts []compose.Option
if cpID != "" {
invokeOpts = []compose.Option{compose.WithCheckPointID(cpID)}
}
// On a resume, the user input is the resume payload for the
// previously-paused UserFillUp node — it does NOT represent
// a fresh sys.query. The begin node writes inputs["query"]
// straight into state.Sys["query"] (begin.go:76), and
// UserFillUp:Menu's initialUserFillUpData reads sys.query
// back to drive the menu's initial-input fast path. If we
// pass userInput through here on a resume, the menu would
// re-consume the resume text as a brand-new branch choice
// and Switch:Route would route to a fresh branch — the
// previously-paused branch would be silently dropped (the
// "second input doesn't resume" symptom reported for
// categorize / iteration / code / wait_input etc.).
wfInput := userInput
if isResume && resumeID != "" {
wfInput = ""
}
_, err = cc.Workflow.Invoke(ctx2, map[string]any{"query": wfInput}, invokeOpts...)
if cpID != "" && s.runTracker != nil {
_ = s.runTracker.AttachCheckpoint(ctx2, runID, cpID)
}
// Collect answer and references from the state snapshot.
// node_finished events are already emitted per-node by the
// statePost wrappers in scheduler.go.
var answer string
var reference []interface{}
now := float64(time.Now().UnixNano()) / 1e9
for _, bucket := range state.Snapshot() {
if v, ok := bucket["answer"].(string); ok && v != "" {
if answer == "" {
answer = v
}
}
if v, ok := bucket["content"].(string); ok && v != "" && answer == "" {
answer = v
}
if v, ok := bucket["result"].(string); ok && v != "" && answer == "" {
answer = v
}
if v, ok := bucket["reference"].([]interface{}); ok {
reference = append(reference, v...)
}
}
if err != nil {
common.Debug("RunAgent invoke err",
zap.String("canvas", canvasID),
zap.String("session", sessionID),
zap.String("task", taskID),
zap.String("run", runID),
zap.String("type", fmt.Sprintf("%T", err)),
zap.Error(err))
if canvas.IsInterruptError(err) {
s.markRunFailed(ctx2, runID, "interrupt: "+err.Error())
if answer != "" {
msgData, _ := json.Marshal(canvas.MessageEvent{
Content: answer,
Reference: reference,
})
emit("message", string(msgData))
meData, _ := json.Marshal(canvas.MessageEndEvent{
Reference: reference,
})
emit("message_end", string(meData))
}
return state, err
}
if shouldTreatAsCompletedLoopRun(err, answer) {
msgData, _ := json.Marshal(canvas.MessageEvent{
Content: answer,
Reference: reference,
})
emit("message", string(msgData))
meData, _ := json.Marshal(canvas.MessageEndEvent{
Reference: reference,
})
emit("message_end", string(meData))
wfData, _ := json.Marshal(map[string]interface{}{
"inputs": map[string]string{"query": userInput},
"outputs": answer,
"elapsed_time": now - startedAt,
"created_at": now,
})
emit("workflow_finished", string(wfData))
s.markRunSucceeded(ctx2, runID)
return state, nil
}
s.markRunFailed(ctx2, runID, "invoke: "+err.Error())
return nil, fmt.Errorf("canvas invoke: %w: %w", ErrAgentStorageError, err)
}
// Emit message + message_end (mirrors Python's ans dict).
msgData, _ := json.Marshal(canvas.MessageEvent{
Content: answer,
Reference: reference,
})
emit("message", string(msgData))
meData, _ := json.Marshal(canvas.MessageEndEvent{
Reference: reference,
})
emit("message_end", string(meData))
// Emit workflow_finished with the final outputs.
wfData, _ := json.Marshal(map[string]interface{}{
"inputs": map[string]string{"query": userInput},
"outputs": answer,
"elapsed_time": now - startedAt,
"created_at": now,
})
emit("workflow_finished", string(wfData))
s.markRunSucceeded(ctx2, runID)
return state, nil
}
}
// runIDFor builds the per-run CanvasState identifier: canvasID
// alone for first-touch runs, canvasID + sessionID for resumed runs
// (so two concurrent sessions on the same canvas don't collide in
// the snapshot map).
func runIDFor(canvasID string, root map[string]any) string {
if s, ok := root["session_id"].(string); ok && s != "" {
return canvasID + "-" + s
}
return canvasID
}
// tenantIDFromRoot returns the optional tenant_id that the handler
// may have populated on the root map. Empty when absent — the
// RunTracker stores "" as the tenant id, which the test suite
// already exercises.
func tenantIDFromRoot(root map[string]any) string {
if s, ok := root["tenant_id"].(string); ok {
return s
}
return ""
}
func shouldTreatAsCompletedLoopRun(err error, answer string) bool {
if err == nil || answer == "" {
return false
}
msg := err.Error()
return strings.Contains(msg, "[GraphRunError] no tasks to execute")
}
// markRunSucceeded records the run as completed successfully via
// the Redis-backed RunTracker. No-op when tracker is nil (test path)
// or when the underlying Redis call fails (degraded boot).
func (s *AgentService) markRunSucceeded(ctx context.Context, runID string) {
if s.runTracker == nil {
return
}
if err := s.runTracker.MarkSucceeded(ctx, runID); err != nil {
common.Warn("service: RunAgent runTracker.MarkSucceeded (best-effort, run not blocked)",
zap.String("run_id", runID),
zap.Error(err))
}
}
// markRunFailed records the run as failed (with reason) via the
// Redis-backed RunTracker. No-op when tracker is nil or the
// underlying Redis call fails.
func (s *AgentService) markRunFailed(ctx context.Context, runID, reason string) {
if s.runTracker == nil {
return
}
if err := s.runTracker.MarkFailed(ctx, runID, reason); err != nil {
common.Warn("service: RunAgent runTracker.MarkFailed (best-effort, run not blocked)",
zap.String("run_id", runID),
zap.String("reason", reason),
zap.Error(err))
}
}
// normalisedDSLForRun returns the DSL map for the given version, or
// nil when the version has no DSL or is missing. The map is a deep
// copy because canvas.Compile mutates some fields in place; reusing
// the same DSL across concurrent runs would race.
func normalisedDSLForRun(v *entity.UserCanvasVersion) map[string]any {
if v == nil || len(v.DSL) == 0 {
return nil
}
return dslpkg.NormalizeForRun(map[string]any(v.DSL))
}
// CancelAgent signals the in-flight run (if any) for the given canvas to
// stop. It is a no-op when no run is currently registered, or when the
// requesting user has no read access to the canvas.
func (s *AgentService) CancelAgent(ctx context.Context, userID, canvasID string) error {
if _, err := s.loadCanvasForUser(ctx, userID, canvasID); err != nil {
return err
}
s.runMu.Lock()
cancel, ok := s.runStreams[canvasID]
s.runMu.Unlock()
if !ok {
return nil
}
select {
case <-cancel:
// already closed
default:
close(cancel)
}
return nil
}