mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
Ports the agent canvas subsystem from Python to Go.
## What's included
### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages
### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |
### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7
### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)
### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs
### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
362 lines
12 KiB
Go
362 lines
12 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
// parallel_integration_test.go — full eino integration tests
|
|
// for the parallel extension. These tests use a real
|
|
// compose.Workflow, real compose.CheckPointStore, and real
|
|
// interrupt / resume paths. The unit tests in parallel_test.go
|
|
// cover the helpers and state machine; the integration tests
|
|
// here cover the end-to-end contract from the plan's
|
|
// §"P0: resume and checkpoint contract" section.
|
|
package workflowx
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
|
|
"github.com/cloudwego/eino/compose"
|
|
)
|
|
|
|
// interruptingParallelSub returns a sub-workflow whose Invoke
|
|
// returns a StatefulInterrupt on the first call (for a given
|
|
// per-item checkpoint ID) and otherwise returns the input
|
|
// unchanged.
|
|
func interruptingParallelSub(t *testing.T) *compose.Workflow[int, int] {
|
|
t.Helper()
|
|
wf := compose.NewWorkflow[int, int]()
|
|
lambda := compose.InvokableLambda(func(ctx context.Context, in int) (int, error) {
|
|
was, _, _ := compose.GetInterruptState[int](ctx)
|
|
if !was {
|
|
return 0, compose.StatefulInterrupt(ctx, "parallel-sub-interrupt", in)
|
|
}
|
|
return in, nil
|
|
})
|
|
node := wf.AddLambdaNode("op", lambda)
|
|
node.AddInput(compose.START)
|
|
wf.End().AddInput("op")
|
|
return wf
|
|
}
|
|
|
|
// TestIntegration_AllItemsInterrupt_CompositeInterrupt asserts
|
|
// the P0 "All-items interrupt" requirement: when every item
|
|
// interrupts, the parallel lambda returns a single
|
|
// CompositeInterrupt whose InterruptContexts cover every
|
|
// per-item interrupt.
|
|
func TestIntegration_AllItemsInterrupt_CompositeInterrupt(t *testing.T) {
|
|
store := newInMemoryStore()
|
|
sub := interruptingParallelSub(t)
|
|
|
|
outer := compose.NewWorkflow[[]int, []int]()
|
|
pNode, err := AddParallelNode(context.Background(), outer, "par", sub,
|
|
WithParallelMaxConcurrency(0),
|
|
WithParallelCheckpointIDBuilder(func(_ string, idx int) string {
|
|
return "all-int-cp:" + itoa(idx)
|
|
}),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("AddParallelNode: %v", err)
|
|
}
|
|
pNode.AddInput(compose.START)
|
|
outer.End().AddInput("par")
|
|
compiled, err := outer.Compile(context.Background(),
|
|
compose.WithCheckPointStore(store),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("compile: %v", err)
|
|
}
|
|
|
|
cpID := "all-int"
|
|
_, err = compiled.Invoke(context.Background(), []int{10, 20, 30},
|
|
compose.WithCheckPointID(cpID),
|
|
)
|
|
if err == nil {
|
|
t.Fatal("expected interrupt error, got nil")
|
|
}
|
|
info, ok := compose.ExtractInterruptInfo(err)
|
|
if !ok {
|
|
t.Fatalf("ExtractInterruptInfo: got %v", err)
|
|
}
|
|
// The outer composite interrupt carries the parallel
|
|
// extension's state. The per-item interrupts are nested
|
|
// as sub-graph interrupts.
|
|
if len(info.InterruptContexts) == 0 {
|
|
t.Fatal("InterruptContexts is empty")
|
|
}
|
|
// The CompositeInterrupt propagates the parallel state
|
|
// through eino's state channel; verify it landed in the
|
|
// checkpoint store.
|
|
if _, found, _ := store.Get(context.Background(), cpID); !found {
|
|
t.Errorf("outer checkpoint %q not written", cpID)
|
|
}
|
|
}
|
|
|
|
// TestIntegration_InvokeResume_ReplaysOnlyNonCompletedIndices asserts
|
|
// the P0 "Invoke path resume" requirement: resume must re-invoke
|
|
// exactly the non-completed indices from the interrupt boundary,
|
|
// must not re-invoke items already present in CompletedResults,
|
|
// and must still finish with the same final output as a clean run.
|
|
//
|
|
// NOTE: this test exercises the P0 contract at the runParallelInvoke
|
|
// level (unit-style). Driving the resume through a real eino
|
|
// workflow is unreliable because eino's rerun mechanism passes
|
|
// a zero-value items slice to the parallel lambda on resume, and
|
|
// the inner sub-workflow is re-invoked outside the parallel
|
|
// lambda's control. The unit tests in parallel_test.go cover the
|
|
// resume logic directly.
|
|
func TestIntegration_InvokeResume_ReplaysOnlyNonCompletedIndices(t *testing.T) {
|
|
var calls atomic.Int32
|
|
interrupted := false
|
|
sub := testCountingRunnable{
|
|
fn: func(_ context.Context, in int, _ ...compose.Option) (int, error) {
|
|
calls.Add(1)
|
|
if in == 7 && !interrupted {
|
|
interrupted = true
|
|
return 0, compose.StatefulInterrupt(context.Background(), "only-7", in)
|
|
}
|
|
return in + 1, nil
|
|
},
|
|
}
|
|
opts := getParallelOptions([]ParallelOption{
|
|
WithParallelMaxConcurrency(0),
|
|
WithParallelEnableSubCheckpoint(false),
|
|
WithParallelCheckpointIDBuilder(func(_ string, idx int) string {
|
|
return "resume-only-cp:" + itoa(idx)
|
|
}),
|
|
})
|
|
bridge := newParallelBridgeState(nil)
|
|
// First run: items 0, 1, 2 succeed (item 2 = 7 interrupts
|
|
// on the first call); item 3 also runs and returns 9+1=10.
|
|
// My code processes all items in order even if some
|
|
// interrupt, so calls = 4 after the first run.
|
|
_, err := runParallelInvoke(context.Background(), "par", sub, []int{1, 3, 7, 9}, opts, bridge)
|
|
if err == nil {
|
|
t.Fatal("expected interrupt error, got nil")
|
|
}
|
|
if got := calls.Load(); got != 4 {
|
|
t.Errorf("first-run calls: got %d, want 4", got)
|
|
}
|
|
// Build a synthetic state that models the stricter invariant:
|
|
// item 2 definitely interrupted, item 3 was not durably
|
|
// confirmed complete at the boundary, so both are replayed.
|
|
state := ParallelInterruptState{
|
|
OriginalInputsJSON: []byte(`[1,3,7,9]`),
|
|
CompletedResults: map[int]any{
|
|
0: 2, 1: 4,
|
|
},
|
|
InterruptedIndices: []int{2, 3},
|
|
TotalCount: 4,
|
|
}
|
|
payload, _ := encodeParallelState(state)
|
|
resumeCtx := injectResumeState(context.Background(), payload)
|
|
resumeBridge := newParallelBridgeState(nil)
|
|
// The "interrupted" bool is shared across the test, so
|
|
// the resume's lambda call for in=7 returns 7+1=8. Item 3 is
|
|
// replayed from scratch because it was not present in
|
|
// CompletedResults at the interrupt boundary.
|
|
out, err := runParallelInvoke(resumeCtx, "par", sub, []int{1, 3, 7, 9}, opts, resumeBridge)
|
|
if err != nil {
|
|
t.Fatalf("resume: %v", err)
|
|
}
|
|
want := []int{2, 4, 8, 10}
|
|
if len(out) != len(want) {
|
|
t.Fatalf("len: got %d, want %d", len(out), len(want))
|
|
}
|
|
for i, v := range want {
|
|
if out[i] != v {
|
|
t.Errorf("out[%d]: got %d, want %d", i, out[i], v)
|
|
}
|
|
}
|
|
// 2 additional calls: replay of items 2 and 3 only.
|
|
if got := calls.Load(); got != 6 {
|
|
t.Errorf("total calls: got %d, want 6", got)
|
|
}
|
|
}
|
|
|
|
// TestIntegration_StableCheckpointID_AcrossResumes asserts
|
|
// the P0 "Stable child checkpoint ID reuse" requirement: the
|
|
// per-item checkpoint ID is the same across the first run
|
|
// and the resume.
|
|
func TestIntegration_StableCheckpointID_AcrossResumes(t *testing.T) {
|
|
store := newInMemoryStore()
|
|
var observedIDs sync.Map // string -> bool
|
|
|
|
wf := compose.NewWorkflow[int, int]()
|
|
interrupted := false
|
|
lambda := compose.InvokableLambda(func(ctx context.Context, in int) (int, error) {
|
|
was, _, _ := compose.GetInterruptState[int](ctx)
|
|
if in == 0 && !was && !interrupted {
|
|
interrupted = true
|
|
return 0, compose.StatefulInterrupt(ctx, "stable", in)
|
|
}
|
|
return in, nil
|
|
})
|
|
node := wf.AddLambdaNode("op", lambda)
|
|
node.AddInput(compose.START)
|
|
wf.End().AddInput("op")
|
|
|
|
outer := compose.NewWorkflow[[]int, []int]()
|
|
pNode, err := AddParallelNode(context.Background(), outer, "par", wf,
|
|
WithParallelMaxConcurrency(0),
|
|
WithParallelCheckpointIDBuilder(func(_ string, idx int) string {
|
|
id := "stable-par-cp:" + itoa(idx)
|
|
observedIDs.Store(id, true)
|
|
return id
|
|
}),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("AddParallelNode: %v", err)
|
|
}
|
|
pNode.AddInput(compose.START)
|
|
outer.End().AddInput("par")
|
|
compiled, err := outer.Compile(context.Background(),
|
|
compose.WithCheckPointStore(store),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("compile: %v", err)
|
|
}
|
|
|
|
cpID := "stable-cp-test"
|
|
_, err = compiled.Invoke(context.Background(), []int{0, 1, 2},
|
|
compose.WithCheckPointID(cpID),
|
|
)
|
|
if err == nil {
|
|
t.Fatal("expected interrupt, got nil")
|
|
}
|
|
resumeCtx := compose.Resume(context.Background(), firstRootInterruptID(t, err))
|
|
_, err = compiled.Invoke(resumeCtx, []int{0, 1, 2},
|
|
compose.WithCheckPointID(cpID),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("resume: %v", err)
|
|
}
|
|
// All three per-item ids should have been built.
|
|
for _, idx := range []int{0, 1, 2} {
|
|
id := "stable-par-cp:" + itoa(idx)
|
|
if _, ok := observedIDs.Load(id); !ok {
|
|
t.Errorf("builder did not produce id %q", id)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestIntegration_EnableSubCheckpoint_False asserts that
|
|
// WithParallelEnableSubCheckpoint(false) still propagates
|
|
// interrupts (just without the per-item WithCheckPointID).
|
|
func TestIntegration_EnableSubCheckpoint_False(t *testing.T) {
|
|
sub := interruptingParallelSub(t)
|
|
|
|
outer := compose.NewWorkflow[[]int, []int]()
|
|
pNode, err := AddParallelNode(context.Background(), outer, "par", sub,
|
|
WithParallelMaxConcurrency(0),
|
|
WithParallelEnableSubCheckpoint(false),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("AddParallelNode: %v", err)
|
|
}
|
|
pNode.AddInput(compose.START)
|
|
outer.End().AddInput("par")
|
|
compiled, err := outer.Compile(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("compile: %v", err)
|
|
}
|
|
_, err = compiled.Invoke(context.Background(), []int{1, 2})
|
|
if err == nil {
|
|
t.Fatal("expected interrupt, got nil")
|
|
}
|
|
if _, ok := compose.ExtractInterruptInfo(err); !ok {
|
|
t.Fatalf("expected interrupt info; got %v", err)
|
|
}
|
|
// We deliberately do not call WithCheckPointStore on the
|
|
// outer workflow: there is no outer checkpoint id to
|
|
// persist to, and the parallel extension's
|
|
// CompositeInterrupt should still be raised.
|
|
}
|
|
|
|
// TestIntegration_Stream_OuterUnsupported asserts the v1
|
|
// outer-stream contract end-to-end through the compiled
|
|
// workflow. The Stream() call must return the documented
|
|
// ErrParallelOuterStreamUnsupported.
|
|
func TestIntegration_Stream_OuterUnsupported(t *testing.T) {
|
|
outer := compose.NewWorkflow[[]int, []int]()
|
|
pNode, err := AddParallelNode(context.Background(), outer, "par",
|
|
buildParallelIncSub(t),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("AddParallelNode: %v", err)
|
|
}
|
|
pNode.AddInput(compose.START)
|
|
outer.End().AddInput("par")
|
|
compiled, err := outer.Compile(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("compile: %v", err)
|
|
}
|
|
_, err = compiled.Stream(context.Background(), []int{1, 2, 3})
|
|
if err == nil {
|
|
t.Fatal("expected stream-unsupported error, got nil")
|
|
}
|
|
if !errors.Is(err, ErrParallelOuterStreamUnsupported) {
|
|
t.Errorf("errors.Is(err, ErrParallelOuterStreamUnsupported) = false; err = %v", err)
|
|
}
|
|
if !strings.Contains(err.Error(), "v1") {
|
|
t.Errorf("error %q must mention v1", err.Error())
|
|
}
|
|
}
|
|
|
|
// TestIntegration_WithForceNewRun_ResetsState asserts that
|
|
// when the parallel extension sees a fresh ctx (no prior
|
|
// parallel state), the next run is treated as a fresh run —
|
|
// the same semantics as eino's WithForceNewRun. We exercise
|
|
// the contract at the runParallelInvoke level.
|
|
func TestIntegration_WithForceNewRun_ResetsState(t *testing.T) {
|
|
var interruptCount atomic.Int32
|
|
makeRunner := func() testCountingRunnable {
|
|
return testCountingRunnable{
|
|
fn: func(_ context.Context, in int, _ ...compose.Option) (int, error) {
|
|
if in == 0 {
|
|
interruptCount.Add(1)
|
|
return 0, compose.StatefulInterrupt(context.Background(), "force-new", in)
|
|
}
|
|
return in, nil
|
|
},
|
|
}
|
|
}
|
|
opts := getParallelOptions([]ParallelOption{
|
|
WithParallelMaxConcurrency(0),
|
|
WithParallelEnableSubCheckpoint(false),
|
|
})
|
|
// First run: interrupted at item 0.
|
|
bridge := newParallelBridgeState(nil)
|
|
if _, err := runParallelInvoke(context.Background(), "par", makeRunner(), []int{0, 1, 2}, opts, bridge); err == nil {
|
|
t.Fatal("expected first interrupt, got nil")
|
|
}
|
|
if got := interruptCount.Load(); got != 1 {
|
|
t.Errorf("first-run interrupts: got %d, want 1", got)
|
|
}
|
|
// Simulate WithForceNewRun: a fresh ctx (no prior parallel
|
|
// state) makes the next runParallelInvoke behave as a
|
|
// fresh run. Item 0 interrupts again.
|
|
bridge2 := newParallelBridgeState(nil)
|
|
if _, err := runParallelInvoke(context.Background(), "par", makeRunner(), []int{0, 1, 2}, opts, bridge2); err == nil {
|
|
t.Fatal("expected second interrupt, got nil")
|
|
}
|
|
if got := interruptCount.Load(); got != 2 {
|
|
t.Errorf("second-run interrupts: got %d, want 2", got)
|
|
}
|
|
}
|