Files
ragflow/internal/agent/component/parallel_test.go
Zhichang Yu 3fa15c0e2f feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952)
Ports the agent canvas subsystem from Python to Go.

## What's included

### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages

### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |

### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7

### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)

### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs

### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
2026-06-12 22:58:28 +08:00

176 lines
5.7 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Package component — Parallel unit tests.
//
// ParallelComponent is a no-op marker in the new architecture: real
// parallel execution is driven by workflowx.AddParallelNode, installed
// by canvas.BuildWorkflow when it sees a Parallel cpn in the DSL. The
// tests in this file exercise the contract ParallelComponent DOES
// expose — registry / factory / param parsing / Name / Inputs / Outputs /
// no-op Invoke / no-op Stream.
//
// Note: "Iteration" and "IterationItem" are still in the registry —
// they are the v1 fixture stubs (v1_stubs.go) that the e2e suite in
// internal/agent/canvas/dsl_examples_e2e_test.go needs to compile the
// v1 DSL examples (iteration.json, headhunter_zh.json). They are NOT
// production components; the real parallel / iteration engine lives
// in canvas/loop_subgraph.go and workflowx.AddLoopNode. The v1 stubs
// are deliberately registered under the v1 names so the e2e path
// resolves the factory to something non-panicking.
package component
import (
"context"
"testing"
)
// TestParallel_Registered confirms "Parallel" is in the registry.
//
// The v1 stub names "Iteration" and "IterationItem" are also in the
// registry by design — see the package comment above. We do NOT
// assert their absence here.
func TestParallel_Registered(t *testing.T) {
names := RegisteredNames()
hasParallel := false
for _, n := range names {
if n == "parallel" {
hasParallel = true
break
}
}
if !hasParallel {
t.Errorf("Parallel not registered; RegisteredNames=%v", names)
}
}
// TestParallel_FactoryReturnsComponent confirms the factory registered
// for "Parallel" produces a Component with the correct name.
func TestParallel_FactoryReturnsComponent(t *testing.T) {
c, err := New("Parallel", map[string]any{
"items_ref": "sys.arr",
"max_concurrency": 5,
})
if err != nil {
t.Fatalf("New(Parallel): %v", err)
}
if c.Name() != "Parallel" {
t.Errorf("Name: got %q, want \"Parallel\"", c.Name())
}
}
// TestParallel_InvokeIsNoOp confirms ParallelComponent.Invoke returns
// an empty map and a nil error. State writes from this method are
// silently dropped by the eino graph because ParallelComponent is not
// registered as an eino node when the macro expansion fires.
func TestParallel_InvokeIsNoOp(t *testing.T) {
c := NewParallelComponent(ParallelParam{
ItemsRef: "sys.arr",
MaxConcurrency: 3,
})
out, err := c.Invoke(context.Background(), map[string]any{"in": 1})
if err != nil {
t.Fatalf("Invoke: %v", err)
}
if len(out) != 0 {
t.Errorf("Invoke should return an empty map, got %v", out)
}
}
// TestParallel_StreamMirrorsInvoke confirms Stream yields exactly one
// empty-map chunk and closes.
func TestParallel_StreamMirrorsInvoke(t *testing.T) {
c := NewParallelComponent(ParallelParam{})
ch, err := c.Stream(context.Background(), nil)
if err != nil {
t.Fatalf("Stream: %v", err)
}
got, ok := <-ch
if !ok {
t.Fatal("Stream channel closed without emitting")
}
if len(got) != 0 {
t.Errorf("Stream chunk: got %v, want empty map", got)
}
if _, open := <-ch; open {
t.Errorf("Stream channel did not close after one chunk")
}
}
// TestParallel_ParamUpdate covers the ParallelParam.Update contract
// for the items_ref and max_concurrency fields. The canvas package's
// buildParallelExpansion reads these from the raw params map directly,
// but ParallelParam.Update is the canonical parser that the factory
// uses; it must round-trip both supported fields.
func TestParallel_ParamUpdate(t *testing.T) {
var p ParallelParam
if err := p.Update(map[string]any{
"items_ref": "sys.arr",
"max_concurrency": 10,
}); err != nil {
t.Fatalf("Update: %v", err)
}
if p.ItemsRef != "sys.arr" {
t.Errorf("ItemsRef: got %q, want \"sys.arr\"", p.ItemsRef)
}
if p.MaxConcurrency != 10 {
t.Errorf("MaxConcurrency: got %d, want 10", p.MaxConcurrency)
}
}
// TestParallel_ParamUpdateNilConf confirms Update(nil) is a no-op.
func TestParallel_ParamUpdateNilConf(t *testing.T) {
var p ParallelParam
if err := p.Update(nil); err != nil {
t.Fatalf("Update(nil): %v", err)
}
if p.ItemsRef != "" {
t.Errorf("ItemsRef: got %q, want empty", p.ItemsRef)
}
if p.MaxConcurrency != 0 {
t.Errorf("MaxConcurrency: got %d, want 0", p.MaxConcurrency)
}
}
// TestParallel_ParamAsDict confirms AsDict round-trips the two
// supported fields when set, and omits them when zero.
func TestParallel_ParamAsDict(t *testing.T) {
p := &ParallelParam{
ItemsRef: "sys.arr",
MaxConcurrency: 0,
}
d := p.AsDict()
if v, _ := d["items_ref"].(string); v != "sys.arr" {
t.Errorf("AsDict items_ref: got %v, want \"sys.arr\"", v)
}
if _, ok := d["max_concurrency"]; ok {
t.Errorf("AsDict: max_concurrency=0 should be omitted")
}
// Zero ParallelParam → empty AsDict.
empty := (&ParallelParam{}).AsDict()
if len(empty) != 0 {
t.Errorf("AsDict zero: got %v, want empty", empty)
}
}
// TestParallel_ParamCheckAlwaysTrue confirms Check is a no-op validator.
func TestParallel_ParamCheckAlwaysTrue(t *testing.T) {
if err := (&ParallelParam{}).Check(); err != nil {
t.Errorf("Check: got %v, want nil", err)
}
}