Files
ragflow/internal/agent/workflowx/loop_integration_test.go
Zhichang Yu 3fa15c0e2f feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952)
Ports the agent canvas subsystem from Python to Go.

## What's included

### Canvas Engine (Phase 0/1)
- State engine, scheduler, variable resolver, Redis checkpoint store,
cancel protocol
- **209 tests** across canvas / component / io packages

### 22 Components (P0–P4)
| Tier | Components |
|---|---|
| P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin,
Message, Invoke |
| P1 T3 | VariableAggregator, VariableAssigner, StringTransform,
ListOperations, DataOperations |
| P2 T3 | Iteration, IterationItem, Loop, LoopItem |
| P3 T3 | UserFillUp, Fillup |
| P4 T5 | Browser, ExcelProcessor, DocsGenerator |

### DSL v2 Schema (Phase 2.5)
- Typed v2 in-memory model with v1-to-v2 auto-detect converter
- v1 legacy field stripping per plan §2.11.7

### HTTP Endpoints & Bug Fixes (Plans PR1–PR3)
- **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)`
pattern
- **CreateAgent validation**: title/DSL required, duplicate check, 103
envelope
- **13 new endpoints**: templates, prompts, tags, sessions CRUD,
chat/completions (SSE + non-stream stubs), rerun, test_db_connection,
logs, webhook/logs
- **756 Go unit tests** (745 → 756, +18)
- **17 → 0 Python integration test failures** (test_agents.py +
test_session_management/)

### Tools
21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory
stubs

### Infrastructure
OTel observability, NATS message queue, DeepDoc gRPC client, SSRF
guards, IDOR mitigation
2026-06-12 22:58:28 +08:00

960 lines
30 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// loop_integration_test.go — full eino integration tests for the
// loop extension. These tests use real compose.Runnable + real
// compose.CheckPointStore to exercise the documented interrupt /
// resume contract from the plan's §"P0: resume and checkpoint
// contract" and §"P0: replay and side effects" sections.
//
// Note on sentinel-error assertions: the eino framework
// re-wraps interrupt errors at the runner boundary, so
// errors.Is(returnedErr, ErrLoopSubGraphInterrupted) may
// return false even when the loop's lambda did emit the
// sentinel. The integration tests therefore check the
// contract via ExtractInterruptInfo plus the loop-local
// state stored in the outer checkpoint. The unit tests in
// loop_test.go cover the errors.Is path for the four
// sentinels (no framework re-wrap happens on the unit
// path because the loop returns plain errors, not
// composite interrupts).
package workflowx
import (
"context"
"errors"
"io"
"sync/atomic"
"testing"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
// interruptingSub is a tiny sub-workflow that interrupts on its
// first Invoke for a given checkpoint ID, then succeeds.
func interruptingSub(t *testing.T) *compose.Workflow[int, int] {
t.Helper()
wf := compose.NewWorkflow[int, int]()
lambda := compose.InvokableLambda(func(ctx context.Context, in int) (int, error) {
was, _, _ := compose.GetInterruptState[int](ctx)
if !was {
return 0, compose.StatefulInterrupt(ctx, "sub-interrupt", in)
}
return in + 1, nil
})
node := wf.AddLambdaNode("inc", lambda)
node.AddInput(compose.START)
wf.End().AddInput("inc")
return wf
}
// counterSub is a non-interrupting sub-workflow whose every call
// increments a counter. Used for max-iter and per-iteration tests.
func counterSub(t *testing.T, counter *atomic.Int64) *compose.Workflow[int, int] {
t.Helper()
wf := compose.NewWorkflow[int, int]()
lambda := compose.InvokableLambda(func(_ context.Context, in int) (int, error) {
counter.Add(1)
return in + 1, nil
})
node := wf.AddLambdaNode("inc", lambda)
node.AddInput(compose.START)
wf.End().AddInput("inc")
return wf
}
func firstRootInterruptID(t *testing.T, err error) string {
t.Helper()
info, ok := extractInterruptInfoDeep(err)
if !ok {
t.Fatalf("ExtractInterruptInfo: got %v", err)
}
if len(info.InterruptContexts) == 0 {
t.Fatal("InterruptContexts is empty")
}
for _, ctx := range info.InterruptContexts {
if ctx.IsRootCause {
return ctx.ID
}
}
return info.InterruptContexts[0].ID
}
func extractInterruptInfoDeep(err error) (*compose.InterruptInfo, bool) {
if err == nil {
return nil, false
}
if info, ok := compose.ExtractInterruptInfo(err); ok {
return info, true
}
type multiUnwrapper interface {
Unwrap() []error
}
if mw, ok := err.(multiUnwrapper); ok {
for _, sub := range mw.Unwrap() {
if info, ok := extractInterruptInfoDeep(sub); ok {
return info, true
}
}
}
if unwrapped := errors.Unwrap(err); unwrapped != nil {
return extractInterruptInfoDeep(unwrapped)
}
return nil, false
}
func readAllInts(t *testing.T, sr *schema.StreamReader[int]) ([]int, error) {
t.Helper()
defer sr.Close()
var out []int
for {
v, err := sr.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return out, nil
}
return out, err
}
out = append(out, v)
}
}
func drainStreamUntilError(t *testing.T, sr *schema.StreamReader[int]) ([]int, error) {
t.Helper()
defer sr.Close()
var out []int
for {
v, err := sr.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return out, nil
}
return out, err
}
out = append(out, v)
}
}
// TestIntegration_OuterVsInnerCallback_Counts asserts the P1
// "Outer callbacks versus inner callbacks" requirement: the
// sub-workflow sees one execution per iteration.
func TestIntegration_OuterVsInnerCallback_Counts(t *testing.T) {
var subCalls atomic.Int64
subStore := newInMemoryStore()
sub := counterSub(t, &subCalls)
shouldQuit := func(_ context.Context, _, _, next int) (bool, error) {
return next >= 3, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopMaxIterations(10),
WithLoopCompileOptions(compose.WithCheckPointStore(subStore)),
WithLoopCheckpointIDBuilder(func(_ string, iter int) string {
return "sub-cp:loop:iter:" + itoa(iter)
}),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background())
if err != nil {
t.Fatalf("compile: %v", err)
}
if _, err := compiled.Invoke(context.Background(), 0); err != nil {
t.Fatalf("invoke: %v", err)
}
if got := subCalls.Load(); got != 3 {
t.Errorf("sub invocations: got %d, want 3", got)
}
}
// TestIntegration_SubWorkflowInterrupt_PropagatedAsComposite
// asserts the basic interrupt propagation contract: when the
// sub-workflow interrupts, the loop returns an error from which
// the original interrupt info is recoverable via
// ExtractInterruptInfo. The "sub-interrupt" string MUST appear in
// the InterruptInfo tree because that is how downstream callers
// distinguish a loop-internal interrupt from a user-level one.
func TestIntegration_SubWorkflowInterrupt_PropagatedAsComposite(t *testing.T) {
outerStore := newInMemoryStore()
subStore := newInMemoryStore()
sub := interruptingSub(t)
shouldQuit := func(_ context.Context, _, _, _ int) (bool, error) {
return true, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopMaxIterations(5),
WithLoopCompileOptions(compose.WithCheckPointStore(subStore)),
WithLoopCheckpointIDBuilder(func(_ string, iter int) string {
return "sub-cp:loop:iter:" + itoa(iter)
}),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background(),
compose.WithCheckPointStore(outerStore),
)
if err != nil {
t.Fatalf("compile: %v", err)
}
_, err = compiled.Invoke(context.Background(), 0,
compose.WithCheckPointID("outer-cp"),
)
if err == nil {
t.Fatal("expected interrupt error, got nil")
}
info, ok := compose.ExtractInterruptInfo(err)
if !ok {
t.Fatalf("ExtractInterruptInfo: got %v", err)
}
if len(info.InterruptContexts) == 0 {
t.Fatal("InterruptContexts is empty")
}
foundSubInterrupt := false
var walk func(*compose.InterruptInfo)
walk = func(i *compose.InterruptInfo) {
if i == nil {
return
}
for _, ctx := range i.InterruptContexts {
if s, ok := ctx.Info.(string); ok && s == "sub-interrupt" {
foundSubInterrupt = true
}
}
for _, sub := range i.SubGraphs {
walk(sub)
}
}
walk(info)
if !foundSubInterrupt {
t.Errorf("InterruptInfo tree does not mention 'sub-interrupt'")
}
}
// TestIntegration_LoopStatePersistedOnInterrupt asserts that when
// the sub-workflow interrupts, the outer checkpoint payload
// exists (i.e. the framework has written the loop's state).
func TestIntegration_LoopStatePersistedOnInterrupt(t *testing.T) {
outerStore := newInMemoryStore()
subStore := newInMemoryStore()
sub := interruptingSub(t)
shouldQuit := func(_ context.Context, _, _, _ int) (bool, error) {
return true, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopMaxIterations(5),
WithLoopCompileOptions(compose.WithCheckPointStore(subStore)),
WithLoopCheckpointIDBuilder(func(_ string, iter int) string {
return "sub-cp:loop:iter:" + itoa(iter)
}),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background(),
compose.WithCheckPointStore(outerStore),
)
if err != nil {
t.Fatalf("compile: %v", err)
}
cpID := "outer-cp-persist"
_, err = compiled.Invoke(context.Background(), 0,
compose.WithCheckPointID(cpID),
)
if err == nil {
t.Fatal("expected interrupt error, got nil")
}
if _, found, _ := outerStore.Get(context.Background(), cpID); !found {
t.Errorf("outer checkpoint %q not written", cpID)
}
}
// TestIntegration_MaxIterationsExceeded_OnInvokePath asserts
// that a sustained (non-converging) loop run surfaces
// ErrLoopMaxIterationsExceeded through the outer invoke. This
// uses a non-interrupting sub-workflow so the loop actually
// reaches the cap.
func TestIntegration_MaxIterationsExceeded_OnInvokePath(t *testing.T) {
var subCalls atomic.Int64
subStore := newInMemoryStore()
sub := counterSub(t, &subCalls)
shouldQuit := func(_ context.Context, _, _, _ int) (bool, error) {
return false, nil // never quits
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopMaxIterations(3),
WithLoopCompileOptions(compose.WithCheckPointStore(subStore)),
WithLoopCheckpointIDBuilder(func(_ string, iter int) string {
return "sub-cp:loop:iter:" + itoa(iter)
}),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background())
if err != nil {
t.Fatalf("compile: %v", err)
}
_, err = compiled.Invoke(context.Background(), 0)
if !errors.Is(err, ErrLoopMaxIterationsExceeded) {
t.Fatalf("got %v, want ErrLoopMaxIterationsExceeded", err)
}
if got := subCalls.Load(); got != 3 {
t.Errorf("sub invocations: got %d, want 3", got)
}
}
// TestIntegration_LoopRunsConcurrentlyWithResumeData checks the
// loop completes the do-while contract end-to-end through a real
// eino workflow with a checkpoint store. Unlike the unit tests
// in loop_test.go, this exercises the full compile/invoke path
// and confirms the loop survives eino's runner.
func TestIntegration_LoopRunsConcurrentlyWithResumeData(t *testing.T) {
store := newInMemoryStore()
sub := interruptingSub(t)
shouldQuit := func(_ context.Context, _, _, next int) (bool, error) {
return next >= 2, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopMaxIterations(10),
WithLoopCompileOptions(compose.WithCheckPointStore(store)),
WithLoopCheckpointIDBuilder(func(_ string, iter int) string {
return "sub-cp:loop:iter:" + itoa(iter)
}),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background(),
compose.WithCheckPointStore(store),
)
if err != nil {
t.Fatalf("compile: %v", err)
}
// Run end-to-end. The sub-workflow interrupts on first
// call; the loop must persist state and return an
// interrupt error.
cpID := "outer-cp-e2e"
_, err = compiled.Invoke(context.Background(), 0,
compose.WithCheckPointID(cpID),
)
if err == nil {
t.Fatal("expected interrupt error on first run, got nil")
}
if _, ok := compose.ExtractInterruptInfo(err); !ok {
t.Errorf("expected interrupt info in error; got %v", err)
}
}
// TestIntegration_EnableSubCheckpoint_HappyPath asserts that
// WithLoopEnableSubCheckpoint makes the loop pass
// compose.WithCheckPointID to the sub-workflow on every nested
// call. The sub-workflow is a counter that uses its own
// checkpoint store; the test simply confirms the run does not
// fail with "receive checkpoint id but have not set checkpoint
// store".
func TestIntegration_EnableSubCheckpoint_HappyPath(t *testing.T) {
var subCalls atomic.Int64
subStore := newInMemoryStore()
sub := counterSub(t, &subCalls)
shouldQuit := func(_ context.Context, _, _, next int) (bool, error) {
return next >= 2, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopMaxIterations(5),
WithLoopCompileOptions(compose.WithCheckPointStore(subStore)),
WithLoopEnableSubCheckpoint(true),
WithLoopCheckpointIDBuilder(func(_ string, iter int) string {
return "sub-cp:loop:iter:" + itoa(iter)
}),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background())
if err != nil {
t.Fatalf("compile: %v", err)
}
if _, err := compiled.Invoke(context.Background(), 0); err != nil {
t.Fatalf("invoke: %v", err)
}
if got := subCalls.Load(); got != 2 {
t.Errorf("sub invocations: got %d, want 2", got)
}
}
// TestIntegration_ResumeContinuesSameIteration asserts the core P0
// resume contract: an interrupt during iteration N resumes at
// iteration N rather than restarting from 1.
func TestIntegration_ResumeContinuesSameIteration(t *testing.T) {
store := newInMemoryStore()
sub := compose.NewWorkflow[int, int]()
interrupted := false
lambda := compose.InvokableLambda(func(ctx context.Context, in int) (int, error) {
wasInterrupted, _, _ := compose.GetInterruptState[int](ctx)
if in == 1 && !wasInterrupted && !interrupted {
interrupted = true
return 0, compose.StatefulInterrupt(ctx, "pause-iter-2", in)
}
return in + 1, nil
})
node := sub.AddLambdaNode("inc", lambda)
node.AddInput(compose.START)
sub.End().AddInput("inc")
var iterations []int
shouldQuit := func(_ context.Context, iter, _, next int) (bool, error) {
iterations = append(iterations, iter)
return next >= 2, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopMaxIterations(10),
WithLoopCheckpointIDBuilder(func(_ string, iter int) string {
return "resume-same-iter:" + itoa(iter)
}),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background(), compose.WithCheckPointStore(store))
if err != nil {
t.Fatalf("compile: %v", err)
}
cpID := "resume-same-iteration"
_, err = compiled.Invoke(context.Background(), 0, compose.WithCheckPointID(cpID))
if err == nil {
t.Fatal("expected interrupt error, got nil")
}
resumeCtx := compose.Resume(context.Background(), firstRootInterruptID(t, err))
out, err := compiled.Invoke(resumeCtx, 0, compose.WithCheckPointID(cpID))
if err != nil {
t.Fatalf("resume invoke: %v", err)
}
if out != 2 {
t.Fatalf("output: got %d, want 2", out)
}
if len(iterations) < 3 {
t.Fatalf("iterations too short: got %v, want prefix [1 2 3]", iterations)
}
wantPrefix := []int{1, 2, 3}
for i := range wantPrefix {
if iterations[i] != wantPrefix[i] {
t.Fatalf("iterations[%d]: got %d, want %d", i, iterations[i], wantPrefix[i])
}
}
}
// TestIntegration_WithForceNewRunRestartsLoop asserts that
// WithForceNewRun ignores the saved loop checkpoint and restarts
// the loop from iteration 1 on the next invocation.
func TestIntegration_WithForceNewRunRestartsLoop(t *testing.T) {
store := newInMemoryStore()
sub := compose.NewWorkflow[int, int]()
interruptions := 0
lambda := compose.InvokableLambda(func(ctx context.Context, in int) (int, error) {
wasInterrupted, _, _ := compose.GetInterruptState[int](ctx)
if in == 1 && !wasInterrupted {
interruptions++
return 0, compose.StatefulInterrupt(ctx, "force-new-run", in)
}
return in + 1, nil
})
node := sub.AddLambdaNode("inc", lambda)
node.AddInput(compose.START)
sub.End().AddInput("inc")
var iterations []int
shouldQuit := func(_ context.Context, iter, _, next int) (bool, error) {
iterations = append(iterations, iter)
return next >= 3, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopMaxIterations(10),
WithLoopCheckpointIDBuilder(func(_ string, iter int) string {
return "force-new-run:" + itoa(iter)
}),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background(), compose.WithCheckPointStore(store))
if err != nil {
t.Fatalf("compile: %v", err)
}
cpID := "force-new-run"
_, err = compiled.Invoke(context.Background(), 0, compose.WithCheckPointID(cpID))
if err == nil {
t.Fatal("expected first interrupt, got nil")
}
_, err = compiled.Invoke(context.Background(), 0,
compose.WithCheckPointID(cpID),
compose.WithForceNewRun(),
)
if err == nil {
t.Fatal("expected second interrupt after force-new-run, got nil")
}
if interruptions != 2 {
t.Fatalf("interruptions: got %d, want 2", interruptions)
}
want := []int{1, 1}
if len(iterations) != len(want) {
t.Fatalf("iterations: got %v, want %v", iterations, want)
}
for i := range want {
if iterations[i] != want[i] {
t.Fatalf("iterations[%d]: got %d, want %d", i, iterations[i], want[i])
}
}
}
// TestIntegration_WithWriteToCheckPointIDPersistsToNewID asserts
// the interrupt state is written to the designated checkpoint ID
// and can be resumed from that new location.
func TestIntegration_WithWriteToCheckPointIDPersistsToNewID(t *testing.T) {
store := newInMemoryStore()
sub := interruptingSub(t)
shouldQuit := func(_ context.Context, _, _, next int) (bool, error) {
return next >= 1, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopMaxIterations(5),
WithLoopCheckpointIDBuilder(func(_ string, iter int) string {
return "write-to-cp:" + itoa(iter)
}),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background(), compose.WithCheckPointStore(store))
if err != nil {
t.Fatalf("compile: %v", err)
}
oldID := "loop-old"
newID := "loop-new"
_, err = compiled.Invoke(context.Background(), 0,
compose.WithCheckPointID(oldID),
compose.WithWriteToCheckPointID(newID),
)
if err == nil {
t.Fatal("expected interrupt error, got nil")
}
if _, found, _ := store.Get(context.Background(), oldID); found {
t.Fatalf("old checkpoint %q should not be written", oldID)
}
if _, found, _ := store.Get(context.Background(), newID); !found {
t.Fatalf("new checkpoint %q was not written", newID)
}
resumeCtx := compose.Resume(context.Background(), firstRootInterruptID(t, err))
out, err := compiled.Invoke(resumeCtx, 0, compose.WithCheckPointID(newID))
if err != nil {
t.Fatalf("resume invoke: %v", err)
}
if out != 1 {
t.Fatalf("output: got %d, want 1", out)
}
}
// TestIntegration_StreamFinalOnly_ResumeExposesOnlyFinalIteration
// asserts that FinalOnly mode does not expose historical iteration
// chunks after resume.
func TestIntegration_StreamFinalOnly_ResumeExposesOnlyFinalIteration(t *testing.T) {
store := newInMemoryStore()
sub := compose.NewWorkflow[int, int]()
interrupted := false
lambda, err := compose.AnyLambda[int, int, struct{}](
nil,
func(ctx context.Context, in int, _ ...struct{}) (*schema.StreamReader[int], error) {
wasInterrupted, _, _ := compose.GetInterruptState[int](ctx)
if in == 10 && !wasInterrupted && !interrupted {
interrupted = true
return nil, compose.StatefulInterrupt(ctx, "stream-final-only", in)
}
return schema.StreamReaderFromArray([]int{in, in + 10}), nil
},
nil,
nil,
)
if err != nil {
t.Fatalf("AnyLambda: %v", err)
}
node := sub.AddLambdaNode("stream", lambda)
node.AddInput(compose.START)
sub.End().AddInput("stream")
shouldQuit := func(_ context.Context, _, _, next int) (bool, error) {
return next >= 20, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopStream(LoopStreamFinalOnly),
WithLoopMaxIterations(5),
WithLoopCheckpointIDBuilder(func(_ string, iter int) string {
return "stream-final-only:" + itoa(iter)
}),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background(), compose.WithCheckPointStore(store))
if err != nil {
t.Fatalf("compile: %v", err)
}
cpID := "stream-final-only"
sr, err := compiled.Stream(context.Background(), 0, compose.WithCheckPointID(cpID))
if err == nil {
_, err = drainStreamUntilError(t, sr)
}
if err == nil {
t.Fatal("expected interrupt error, got nil")
}
sr, err = compiled.Stream(context.Background(), 0, compose.WithCheckPointID(cpID))
if err != nil {
t.Fatalf("resume stream: %v", err)
}
got, err := readAllInts(t, sr)
if err != nil {
t.Fatalf("read stream: %v", err)
}
want := []int{10, 20}
if len(got) != len(want) {
t.Fatalf("chunks: got %v, want %v", got, want)
}
for i := range want {
if got[i] != want[i] {
t.Fatalf("chunks[%d]: got %d, want %d", i, got[i], want[i])
}
}
}
// TestIntegration_StreamEveryIteration_ResumeFromFirstUnpublishedIteration
// asserts the documented replay contract for EveryIteration mode:
// fully published iterations are not replayed, while the interrupted
// iteration is replayed from its start.
func TestIntegration_StreamEveryIteration_ResumeFromFirstUnpublishedIteration(t *testing.T) {
store := newInMemoryStore()
sub := compose.NewWorkflow[int, int]()
interrupted := false
lambda, err := compose.AnyLambda[int, int, struct{}](
nil,
func(ctx context.Context, in int, _ ...struct{}) (*schema.StreamReader[int], error) {
wasInterrupted, _, _ := compose.GetInterruptState[int](ctx)
if in == 10 && !wasInterrupted && !interrupted {
interrupted = true
return nil, compose.StatefulInterrupt(ctx, "stream-every-iteration", in)
}
return schema.StreamReaderFromArray([]int{in, in + 10}), nil
},
nil,
nil,
)
if err != nil {
t.Fatalf("AnyLambda: %v", err)
}
node := sub.AddLambdaNode("stream", lambda)
node.AddInput(compose.START)
sub.End().AddInput("stream")
shouldQuit := func(_ context.Context, _, _, next int) (bool, error) {
return next >= 20, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopStream(LoopStreamEveryIteration),
WithLoopMaxIterations(5),
WithLoopCheckpointIDBuilder(func(_ string, iter int) string {
return "stream-every-iteration:" + itoa(iter)
}),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background(), compose.WithCheckPointStore(store))
if err != nil {
t.Fatalf("compile: %v", err)
}
cpID := "stream-every-iteration"
sr, err := compiled.Stream(context.Background(), 0, compose.WithCheckPointID(cpID))
if err == nil {
_, err = drainStreamUntilError(t, sr)
}
if err == nil {
t.Fatal("expected interrupt error, got nil")
}
sr, err = compiled.Stream(context.Background(), 0, compose.WithCheckPointID(cpID))
if err != nil {
t.Fatalf("resume stream: %v", err)
}
got, err := readAllInts(t, sr)
if err != nil {
t.Fatalf("read stream: %v", err)
}
want := []int{0, 10, 10, 20}
if len(got) != len(want) {
t.Fatalf("chunks: got %v, want %v", got, want)
}
for i := range want {
if got[i] != want[i] {
t.Fatalf("chunks[%d]: got %d, want %d", i, got[i], want[i])
}
}
}
// streamingIncSub builds a sub-workflow whose Stream emits two chunks
// per iteration: {in, in+1}. The second chunk is the value the loop
// machinery uses as `next` (loop.go derives next from the last value
// emitted in the iteration). This sub deliberately never interrupts so
// the happy-path stream tests can assert chunk ordering across many
// iterations without exercising the resume code paths (which already
// have dedicated tests above).
func streamingIncSub(t *testing.T) *compose.Workflow[int, int] {
t.Helper()
wf := compose.NewWorkflow[int, int]()
lambda, err := compose.AnyLambda[int, int, struct{}](
nil,
func(_ context.Context, in int, _ ...struct{}) (*schema.StreamReader[int], error) {
return schema.StreamReaderFromArray([]int{in, in + 1}), nil
},
nil,
nil,
)
if err != nil {
t.Fatalf("AnyLambda: %v", err)
}
node := wf.AddLambdaNode("stream", lambda)
node.AddInput(compose.START)
wf.End().AddInput("stream")
return wf
}
// TestIntegration_StreamFinalOnly_HappyPath exercises the
// LoopStreamFinalOnly mode end-to-end on a fresh (no-interrupt) run.
// The existing FinalOnly stream test only covers the resume path; this
// test asserts the documented buffer-and-emit-last contract when no
// interrupt occurs.
//
// Iterations: in=0 -> [0,1] (next=1), in=1 -> [1,2] (next=2), in=2 ->
// [2,3] (next=3, quit). Caller must observe ONLY the final iteration's
// chunks: [2, 3].
func TestIntegration_StreamFinalOnly_HappyPath(t *testing.T) {
sub := streamingIncSub(t)
shouldQuit := func(_ context.Context, _, _, next int) (bool, error) {
return next >= 3, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopStream(LoopStreamFinalOnly),
WithLoopMaxIterations(10),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background())
if err != nil {
t.Fatalf("compile: %v", err)
}
sr, err := compiled.Stream(context.Background(), 0)
if err != nil {
t.Fatalf("stream: %v", err)
}
got, err := readAllInts(t, sr)
if err != nil {
t.Fatalf("read stream: %v", err)
}
want := []int{2, 3}
if len(got) != len(want) {
t.Fatalf("chunks: got %v, want %v", got, want)
}
for i := range want {
if got[i] != want[i] {
t.Fatalf("chunks[%d]: got %d, want %d", i, got[i], want[i])
}
}
}
// TestIntegration_StreamEveryIteration_HappyPath exercises the
// LoopStreamEveryIteration mode end-to-end on a fresh run. The
// existing EveryIteration stream test only covers the replay path on
// resume; this test asserts the documented forward-every-iteration
// contract when no interrupt occurs.
//
// Iterations: in=0 -> [0,1], in=1 -> [1,2], in=2 -> [2,3] (quit).
// Caller must observe every iteration's chunks concatenated in order:
// [0, 1, 1, 2, 2, 3].
func TestIntegration_StreamEveryIteration_HappyPath(t *testing.T) {
sub := streamingIncSub(t)
shouldQuit := func(_ context.Context, _, _, next int) (bool, error) {
return next >= 3, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopStream(LoopStreamEveryIteration),
WithLoopMaxIterations(10),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background())
if err != nil {
t.Fatalf("compile: %v", err)
}
sr, err := compiled.Stream(context.Background(), 0)
if err != nil {
t.Fatalf("stream: %v", err)
}
got, err := readAllInts(t, sr)
if err != nil {
t.Fatalf("read stream: %v", err)
}
want := []int{0, 1, 1, 2, 2, 3}
if len(got) != len(want) {
t.Fatalf("chunks: got %v, want %v", got, want)
}
for i := range want {
if got[i] != want[i] {
t.Fatalf("chunks[%d]: got %d, want %d", i, got[i], want[i])
}
}
}
// TestIntegration_Stream_EmptyIterationFails covers the empty-stream
// error branch in runLoopStream: a sub-workflow that yields zero
// chunks for an iteration leaves the loop with no value to feed into
// shouldQuit or into the next iteration's input, so the loop must
// fail with the documented "produced empty stream" error. Without
// this test the branch (loop.go: "iteration N produced empty stream")
// is unreachable from the existing test surface.
func TestIntegration_Stream_EmptyIterationFails(t *testing.T) {
sub := compose.NewWorkflow[int, int]()
lambda, err := compose.AnyLambda[int, int, struct{}](
nil,
func(_ context.Context, _ int, _ ...struct{}) (*schema.StreamReader[int], error) {
return schema.StreamReaderFromArray([]int{}), nil
},
nil,
nil,
)
if err != nil {
t.Fatalf("AnyLambda: %v", err)
}
node := sub.AddLambdaNode("empty", lambda)
node.AddInput(compose.START)
sub.End().AddInput("empty")
shouldQuit := func(_ context.Context, _, _, _ int) (bool, error) {
t.Fatal("shouldQuit must not be called when iteration stream is empty")
return false, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopStream(LoopStreamFinalOnly),
WithLoopMaxIterations(3),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background())
if err != nil {
t.Fatalf("compile: %v", err)
}
sr, err := compiled.Stream(context.Background(), 0)
if err == nil {
_, err = readAllInts(t, sr)
}
if err == nil {
t.Fatal("expected empty-stream error, got nil")
}
if msg := err.Error(); !contains(msg, "produced empty stream") {
t.Fatalf("error %q must mention 'produced empty stream'", msg)
}
}
// contains is a tiny strings.Contains shim kept in this file to
// avoid pulling the strings import into the test package solely for
// one assertion (loop_test.go already imports it; loop_integration_
// test.go does not).
func contains(haystack, needle string) bool {
for i := 0; i+len(needle) <= len(haystack); i++ {
if haystack[i:i+len(needle)] == needle {
return true
}
}
return false
}