Files
ragflow/internal/agent/workflowx/parallel_integration_test.go
Zhichang Yu 3fa15c0e2f feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952)
Ports the agent canvas subsystem from Python to Go.

## What's included

### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages

### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |

### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7

### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)

### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs

### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
2026-06-12 22:58:28 +08:00

362 lines
12 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// parallel_integration_test.go — full eino integration tests
// for the parallel extension. These tests use a real
// compose.Workflow, real compose.CheckPointStore, and real
// interrupt / resume paths. The unit tests in parallel_test.go
// cover the helpers and state machine; the integration tests
// here cover the end-to-end contract from the plan's
// §"P0: resume and checkpoint contract" section.
package workflowx
import (
"context"
"errors"
"strings"
"sync"
"sync/atomic"
"testing"
"github.com/cloudwego/eino/compose"
)
// interruptingParallelSub returns a sub-workflow whose Invoke
// returns a StatefulInterrupt on the first call (for a given
// per-item checkpoint ID) and otherwise returns the input
// unchanged.
func interruptingParallelSub(t *testing.T) *compose.Workflow[int, int] {
t.Helper()
wf := compose.NewWorkflow[int, int]()
lambda := compose.InvokableLambda(func(ctx context.Context, in int) (int, error) {
was, _, _ := compose.GetInterruptState[int](ctx)
if !was {
return 0, compose.StatefulInterrupt(ctx, "parallel-sub-interrupt", in)
}
return in, nil
})
node := wf.AddLambdaNode("op", lambda)
node.AddInput(compose.START)
wf.End().AddInput("op")
return wf
}
// TestIntegration_AllItemsInterrupt_CompositeInterrupt asserts
// the P0 "All-items interrupt" requirement: when every item
// interrupts, the parallel lambda returns a single
// CompositeInterrupt whose InterruptContexts cover every
// per-item interrupt.
func TestIntegration_AllItemsInterrupt_CompositeInterrupt(t *testing.T) {
store := newInMemoryStore()
sub := interruptingParallelSub(t)
outer := compose.NewWorkflow[[]int, []int]()
pNode, err := AddParallelNode(context.Background(), outer, "par", sub,
WithParallelMaxConcurrency(0),
WithParallelCheckpointIDBuilder(func(_ string, idx int) string {
return "all-int-cp:" + itoa(idx)
}),
)
if err != nil {
t.Fatalf("AddParallelNode: %v", err)
}
pNode.AddInput(compose.START)
outer.End().AddInput("par")
compiled, err := outer.Compile(context.Background(),
compose.WithCheckPointStore(store),
)
if err != nil {
t.Fatalf("compile: %v", err)
}
cpID := "all-int"
_, err = compiled.Invoke(context.Background(), []int{10, 20, 30},
compose.WithCheckPointID(cpID),
)
if err == nil {
t.Fatal("expected interrupt error, got nil")
}
info, ok := compose.ExtractInterruptInfo(err)
if !ok {
t.Fatalf("ExtractInterruptInfo: got %v", err)
}
// The outer composite interrupt carries the parallel
// extension's state. The per-item interrupts are nested
// as sub-graph interrupts.
if len(info.InterruptContexts) == 0 {
t.Fatal("InterruptContexts is empty")
}
// The CompositeInterrupt propagates the parallel state
// through eino's state channel; verify it landed in the
// checkpoint store.
if _, found, _ := store.Get(context.Background(), cpID); !found {
t.Errorf("outer checkpoint %q not written", cpID)
}
}
// TestIntegration_InvokeResume_ReplaysOnlyNonCompletedIndices asserts
// the P0 "Invoke path resume" requirement: resume must re-invoke
// exactly the non-completed indices from the interrupt boundary,
// must not re-invoke items already present in CompletedResults,
// and must still finish with the same final output as a clean run.
//
// NOTE: this test exercises the P0 contract at the runParallelInvoke
// level (unit-style). Driving the resume through a real eino
// workflow is unreliable because eino's rerun mechanism passes
// a zero-value items slice to the parallel lambda on resume, and
// the inner sub-workflow is re-invoked outside the parallel
// lambda's control. The unit tests in parallel_test.go cover the
// resume logic directly.
func TestIntegration_InvokeResume_ReplaysOnlyNonCompletedIndices(t *testing.T) {
var calls atomic.Int32
interrupted := false
sub := testCountingRunnable{
fn: func(_ context.Context, in int, _ ...compose.Option) (int, error) {
calls.Add(1)
if in == 7 && !interrupted {
interrupted = true
return 0, compose.StatefulInterrupt(context.Background(), "only-7", in)
}
return in + 1, nil
},
}
opts := getParallelOptions([]ParallelOption{
WithParallelMaxConcurrency(0),
WithParallelEnableSubCheckpoint(false),
WithParallelCheckpointIDBuilder(func(_ string, idx int) string {
return "resume-only-cp:" + itoa(idx)
}),
})
bridge := newParallelBridgeState(nil)
// First run: items 0, 1, 2 succeed (item 2 = 7 interrupts
// on the first call); item 3 also runs and returns 9+1=10.
// My code processes all items in order even if some
// interrupt, so calls = 4 after the first run.
_, err := runParallelInvoke(context.Background(), "par", sub, []int{1, 3, 7, 9}, opts, bridge)
if err == nil {
t.Fatal("expected interrupt error, got nil")
}
if got := calls.Load(); got != 4 {
t.Errorf("first-run calls: got %d, want 4", got)
}
// Build a synthetic state that models the stricter invariant:
// item 2 definitely interrupted, item 3 was not durably
// confirmed complete at the boundary, so both are replayed.
state := ParallelInterruptState{
OriginalInputsJSON: []byte(`[1,3,7,9]`),
CompletedResults: map[int]any{
0: 2, 1: 4,
},
InterruptedIndices: []int{2, 3},
TotalCount: 4,
}
payload, _ := encodeParallelState(state)
resumeCtx := injectResumeState(context.Background(), payload)
resumeBridge := newParallelBridgeState(nil)
// The "interrupted" bool is shared across the test, so
// the resume's lambda call for in=7 returns 7+1=8. Item 3 is
// replayed from scratch because it was not present in
// CompletedResults at the interrupt boundary.
out, err := runParallelInvoke(resumeCtx, "par", sub, []int{1, 3, 7, 9}, opts, resumeBridge)
if err != nil {
t.Fatalf("resume: %v", err)
}
want := []int{2, 4, 8, 10}
if len(out) != len(want) {
t.Fatalf("len: got %d, want %d", len(out), len(want))
}
for i, v := range want {
if out[i] != v {
t.Errorf("out[%d]: got %d, want %d", i, out[i], v)
}
}
// 2 additional calls: replay of items 2 and 3 only.
if got := calls.Load(); got != 6 {
t.Errorf("total calls: got %d, want 6", got)
}
}
// TestIntegration_StableCheckpointID_AcrossResumes asserts
// the P0 "Stable child checkpoint ID reuse" requirement: the
// per-item checkpoint ID is the same across the first run
// and the resume.
func TestIntegration_StableCheckpointID_AcrossResumes(t *testing.T) {
store := newInMemoryStore()
var observedIDs sync.Map // string -> bool
wf := compose.NewWorkflow[int, int]()
interrupted := false
lambda := compose.InvokableLambda(func(ctx context.Context, in int) (int, error) {
was, _, _ := compose.GetInterruptState[int](ctx)
if in == 0 && !was && !interrupted {
interrupted = true
return 0, compose.StatefulInterrupt(ctx, "stable", in)
}
return in, nil
})
node := wf.AddLambdaNode("op", lambda)
node.AddInput(compose.START)
wf.End().AddInput("op")
outer := compose.NewWorkflow[[]int, []int]()
pNode, err := AddParallelNode(context.Background(), outer, "par", wf,
WithParallelMaxConcurrency(0),
WithParallelCheckpointIDBuilder(func(_ string, idx int) string {
id := "stable-par-cp:" + itoa(idx)
observedIDs.Store(id, true)
return id
}),
)
if err != nil {
t.Fatalf("AddParallelNode: %v", err)
}
pNode.AddInput(compose.START)
outer.End().AddInput("par")
compiled, err := outer.Compile(context.Background(),
compose.WithCheckPointStore(store),
)
if err != nil {
t.Fatalf("compile: %v", err)
}
cpID := "stable-cp-test"
_, err = compiled.Invoke(context.Background(), []int{0, 1, 2},
compose.WithCheckPointID(cpID),
)
if err == nil {
t.Fatal("expected interrupt, got nil")
}
resumeCtx := compose.Resume(context.Background(), firstRootInterruptID(t, err))
_, err = compiled.Invoke(resumeCtx, []int{0, 1, 2},
compose.WithCheckPointID(cpID),
)
if err != nil {
t.Fatalf("resume: %v", err)
}
// All three per-item ids should have been built.
for _, idx := range []int{0, 1, 2} {
id := "stable-par-cp:" + itoa(idx)
if _, ok := observedIDs.Load(id); !ok {
t.Errorf("builder did not produce id %q", id)
}
}
}
// TestIntegration_EnableSubCheckpoint_False asserts that
// WithParallelEnableSubCheckpoint(false) still propagates
// interrupts (just without the per-item WithCheckPointID).
func TestIntegration_EnableSubCheckpoint_False(t *testing.T) {
sub := interruptingParallelSub(t)
outer := compose.NewWorkflow[[]int, []int]()
pNode, err := AddParallelNode(context.Background(), outer, "par", sub,
WithParallelMaxConcurrency(0),
WithParallelEnableSubCheckpoint(false),
)
if err != nil {
t.Fatalf("AddParallelNode: %v", err)
}
pNode.AddInput(compose.START)
outer.End().AddInput("par")
compiled, err := outer.Compile(context.Background())
if err != nil {
t.Fatalf("compile: %v", err)
}
_, err = compiled.Invoke(context.Background(), []int{1, 2})
if err == nil {
t.Fatal("expected interrupt, got nil")
}
if _, ok := compose.ExtractInterruptInfo(err); !ok {
t.Fatalf("expected interrupt info; got %v", err)
}
// We deliberately do not call WithCheckPointStore on the
// outer workflow: there is no outer checkpoint id to
// persist to, and the parallel extension's
// CompositeInterrupt should still be raised.
}
// TestIntegration_Stream_OuterUnsupported asserts the v1
// outer-stream contract end-to-end through the compiled
// workflow. The Stream() call must return the documented
// ErrParallelOuterStreamUnsupported.
func TestIntegration_Stream_OuterUnsupported(t *testing.T) {
outer := compose.NewWorkflow[[]int, []int]()
pNode, err := AddParallelNode(context.Background(), outer, "par",
buildParallelIncSub(t),
)
if err != nil {
t.Fatalf("AddParallelNode: %v", err)
}
pNode.AddInput(compose.START)
outer.End().AddInput("par")
compiled, err := outer.Compile(context.Background())
if err != nil {
t.Fatalf("compile: %v", err)
}
_, err = compiled.Stream(context.Background(), []int{1, 2, 3})
if err == nil {
t.Fatal("expected stream-unsupported error, got nil")
}
if !errors.Is(err, ErrParallelOuterStreamUnsupported) {
t.Errorf("errors.Is(err, ErrParallelOuterStreamUnsupported) = false; err = %v", err)
}
if !strings.Contains(err.Error(), "v1") {
t.Errorf("error %q must mention v1", err.Error())
}
}
// TestIntegration_WithForceNewRun_ResetsState asserts that
// when the parallel extension sees a fresh ctx (no prior
// parallel state), the next run is treated as a fresh run —
// the same semantics as eino's WithForceNewRun. We exercise
// the contract at the runParallelInvoke level.
func TestIntegration_WithForceNewRun_ResetsState(t *testing.T) {
var interruptCount atomic.Int32
makeRunner := func() testCountingRunnable {
return testCountingRunnable{
fn: func(_ context.Context, in int, _ ...compose.Option) (int, error) {
if in == 0 {
interruptCount.Add(1)
return 0, compose.StatefulInterrupt(context.Background(), "force-new", in)
}
return in, nil
},
}
}
opts := getParallelOptions([]ParallelOption{
WithParallelMaxConcurrency(0),
WithParallelEnableSubCheckpoint(false),
})
// First run: interrupted at item 0.
bridge := newParallelBridgeState(nil)
if _, err := runParallelInvoke(context.Background(), "par", makeRunner(), []int{0, 1, 2}, opts, bridge); err == nil {
t.Fatal("expected first interrupt, got nil")
}
if got := interruptCount.Load(); got != 1 {
t.Errorf("first-run interrupts: got %d, want 1", got)
}
// Simulate WithForceNewRun: a fresh ctx (no prior parallel
// state) makes the next runParallelInvoke behave as a
// fresh run. Item 0 interrupts again.
bridge2 := newParallelBridgeState(nil)
if _, err := runParallelInvoke(context.Background(), "par", makeRunner(), []int{0, 1, 2}, opts, bridge2); err == nil {
t.Fatal("expected second interrupt, got nil")
}
if got := interruptCount.Load(); got != 2 {
t.Errorf("second-run interrupts: got %d, want 2", got)
}
}