feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952)
Ports the agent canvas subsystem from Python to Go.
## What's included
### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages
### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |
### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7
### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)
### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs
### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
|
|
|
//
|
|
|
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
|
|
|
//
|
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
|
//
|
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
//
|
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
|
// limitations under the License.
|
|
|
|
|
//
|
|
|
|
|
|
2026-06-17 13:24:03 +08:00
|
|
|
// Package runtime implements per-tenant runtime selection for the
|
|
|
|
|
// agent canvas port.
|
feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952)
Ports the agent canvas subsystem from Python to Go.
## What's included
### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages
### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |
### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7
### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)
### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs
### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
|
|
|
//
|
|
|
|
|
// Two pieces live in this package:
|
|
|
|
|
//
|
2026-06-17 13:24:03 +08:00
|
|
|
// - Selector (this file): reads/writes the per-tenant runtime
|
|
|
|
|
// override in Redis. The default is RuntimeGo; per-tenant
|
|
|
|
|
// overrides still let operators force a tenant back to Python
|
|
|
|
|
// during the agent_api.py deprecation window.
|
|
|
|
|
// - Metrics (metrics.go): Prometheus counter + histogram for
|
|
|
|
|
// per-run observation, keyed by runtime mode.
|
feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952)
Ports the agent canvas subsystem from Python to Go.
## What's included
### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages
### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |
### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7
### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)
### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs
### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
|
|
|
package runtime
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"os"
|
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
)
|
|
|
|
|
|
2026-06-17 13:24:03 +08:00
|
|
|
// RuntimeMode identifies which agent-canvas runtime implementation
|
|
|
|
|
// serves a given tenant. Supports "go" and "python"; "auto" is
|
|
|
|
|
// reserved for future adaptive policies.
|
feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952)
Ports the agent canvas subsystem from Python to Go.
## What's included
### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages
### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |
### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7
### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)
### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs
### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
|
|
|
type RuntimeMode string
|
|
|
|
|
|
|
|
|
|
const (
|
2026-06-17 13:24:03 +08:00
|
|
|
// RuntimeGo routes the tenant to the Go-side eino
|
|
|
|
|
// implementation. This is the process-wide default.
|
feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952)
Ports the agent canvas subsystem from Python to Go.
## What's included
### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages
### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |
### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7
### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)
### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs
### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
|
|
|
RuntimeGo RuntimeMode = "go"
|
2026-06-17 13:24:03 +08:00
|
|
|
// RuntimePython routes the tenant to the legacy Python
|
|
|
|
|
// agent_api.py implementation. Retained for the 1-release
|
|
|
|
|
// deprecation window; per-tenant overrides via Selector.Set
|
|
|
|
|
// can still force a tenant to this mode.
|
feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952)
Ports the agent canvas subsystem from Python to Go.
## What's included
### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages
### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |
### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7
### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)
### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs
### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
|
|
|
RuntimePython RuntimeMode = "python"
|
|
|
|
|
// RuntimeAuto defers to the per-tenant override, then to the
|
|
|
|
|
// process-wide Default(). It exists as a sentinel for clients that
|
|
|
|
|
// want explicit "I don't care, pick for me" semantics.
|
|
|
|
|
RuntimeAuto RuntimeMode = "auto"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// defaultEnvKey is the environment variable consulted by Default() when no
|
|
|
|
|
// override is registered for a tenant.
|
|
|
|
|
const defaultEnvKey = "RAGFLOW_CANVAS_DEFAULT_RUNTIME"
|
|
|
|
|
|
|
|
|
|
// overrideKeyPrefix is the Redis key namespace for per-tenant runtime
|
|
|
|
|
// overrides. Final keys look like "tenant_canvas_runtime:<tenantID>".
|
|
|
|
|
const overrideKeyPrefix = "tenant_canvas_runtime:"
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
defaultOnce sync.Once
|
|
|
|
|
defaultMode RuntimeMode
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Default returns the process-wide default runtime mode.
|
|
|
|
|
//
|
2026-06-17 13:24:03 +08:00
|
|
|
// The default is Go. The per-tenant override (via Selector.Set)
|
|
|
|
|
// can still force a tenant back to Python for the 1-release
|
|
|
|
|
// deprecation window of agent_api.py.
|
feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952)
Ports the agent canvas subsystem from Python to Go.
## What's included
### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages
### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |
### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7
### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)
### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs
### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
|
|
|
//
|
|
|
|
|
// The value is read once from the RAGFLOW_CANVAS_DEFAULT_RUNTIME env var;
|
|
|
|
|
// subsequent calls return the cached result. Unknown env values fall back
|
|
|
|
|
// to RuntimeGo (the new default) so a misconfig still lands on the Go path.
|
|
|
|
|
func Default() RuntimeMode {
|
|
|
|
|
defaultOnce.Do(func() {
|
|
|
|
|
raw := os.Getenv(defaultEnvKey)
|
|
|
|
|
switch RuntimeMode(raw) {
|
|
|
|
|
case RuntimeGo, RuntimePython, RuntimeAuto:
|
|
|
|
|
defaultMode = RuntimeMode(raw)
|
|
|
|
|
default:
|
|
|
|
|
defaultMode = RuntimeGo
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
return defaultMode
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ResetDefaultCache clears the cached default-mode value. Test-only helper.
|
|
|
|
|
func ResetDefaultCache() {
|
|
|
|
|
defaultOnce = sync.Once{}
|
|
|
|
|
defaultMode = ""
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Selector resolves the runtime mode for a tenant at request time. It is
|
|
|
|
|
// safe for concurrent use.
|
|
|
|
|
type Selector struct {
|
|
|
|
|
redis *redis.Client
|
|
|
|
|
logger *zap.Logger
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewSelector constructs a Selector backed by the supplied Redis client. A
|
|
|
|
|
// nil logger is replaced with zap.NewNop() so callers in tests can omit it.
|
|
|
|
|
func NewSelector(rdb *redis.Client, logger *zap.Logger) *Selector {
|
|
|
|
|
if logger == nil {
|
|
|
|
|
logger = zap.NewNop()
|
|
|
|
|
}
|
|
|
|
|
return &Selector{redis: rdb, logger: logger}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// overrideKey returns the Redis key for a tenant's runtime override.
|
|
|
|
|
func overrideKey(tenantID string) string {
|
|
|
|
|
return overrideKeyPrefix + tenantID
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Select returns the runtime mode registered for tenantID. The lookup
|
|
|
|
|
// order is:
|
|
|
|
|
//
|
|
|
|
|
// 1. The Redis key "tenant_canvas_runtime:<tenantID>" if present.
|
|
|
|
|
// 2. The process-wide Default() (env RAGFLOW_CANVAS_DEFAULT_RUNTIME,
|
2026-06-17 13:24:03 +08:00
|
|
|
// falling back to RuntimeGo).
|
feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952)
Ports the agent canvas subsystem from Python to Go.
## What's included
### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages
### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |
### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7
### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)
### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs
### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
|
|
|
//
|
|
|
|
|
// A nil Redis client short-circuits to the default and never errors.
|
|
|
|
|
func (s *Selector) Select(ctx context.Context, tenantID string) (RuntimeMode, error) {
|
|
|
|
|
if s == nil || s.redis == nil {
|
|
|
|
|
return Default(), nil
|
|
|
|
|
}
|
|
|
|
|
raw, err := s.redis.Get(ctx, overrideKey(tenantID)).Result()
|
|
|
|
|
if err == redis.Nil {
|
|
|
|
|
return Default(), nil
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
s.logger.Warn("runtime selector: redis get failed, falling back to default",
|
|
|
|
|
zap.String("tenant_id", tenantID), zap.Error(err))
|
|
|
|
|
return Default(), err
|
|
|
|
|
}
|
|
|
|
|
mode := RuntimeMode(raw)
|
|
|
|
|
switch mode {
|
|
|
|
|
case RuntimeGo, RuntimePython, RuntimeAuto:
|
|
|
|
|
return mode, nil
|
|
|
|
|
default:
|
|
|
|
|
s.logger.Warn("runtime selector: unrecognized value, falling back to default",
|
|
|
|
|
zap.String("tenant_id", tenantID), zap.String("value", raw))
|
|
|
|
|
return Default(), fmt.Errorf("unrecognized runtime mode %q for tenant %q", raw, tenantID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Set overrides the runtime mode for a tenant. The override has no TTL
|
|
|
|
|
// (it is permanent until explicitly changed) so the operator does not have
|
|
|
|
|
// to remember to re-set it after a Redis flush of short-lived keys. Used
|
|
|
|
|
// by the admin runtime endpoint and tests.
|
|
|
|
|
func (s *Selector) Set(ctx context.Context, tenantID string, mode RuntimeMode) error {
|
|
|
|
|
if s == nil || s.redis == nil {
|
|
|
|
|
return fmt.Errorf("runtime selector: no redis client configured")
|
|
|
|
|
}
|
|
|
|
|
switch mode {
|
|
|
|
|
case RuntimeGo, RuntimePython, RuntimeAuto:
|
|
|
|
|
default:
|
|
|
|
|
return fmt.Errorf("runtime selector: refusing to set invalid mode %q", mode)
|
|
|
|
|
}
|
|
|
|
|
return s.redis.Set(ctx, overrideKey(tenantID), string(mode), 0).Err()
|
|
|
|
|
}
|