// // Copyright 2026 The InfiniFlow Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // parallel_integration_test.go — full eino integration tests // for the parallel extension. These tests use a real // compose.Workflow, real compose.CheckPointStore, and real // interrupt / resume paths. The unit tests in parallel_test.go // cover the helpers and state machine; the integration tests // here cover the end-to-end contract from the plan's // §"P0: resume and checkpoint contract" section. package workflowx import ( "context" "errors" "strings" "sync" "sync/atomic" "testing" "github.com/cloudwego/eino/compose" ) // interruptingParallelSub returns a sub-workflow whose Invoke // returns a StatefulInterrupt on the first call (for a given // per-item checkpoint ID) and otherwise returns the input // unchanged. func interruptingParallelSub(t *testing.T) *compose.Workflow[int, int] { t.Helper() wf := compose.NewWorkflow[int, int]() lambda := compose.InvokableLambda(func(ctx context.Context, in int) (int, error) { was, _, _ := compose.GetInterruptState[int](ctx) if !was { return 0, compose.StatefulInterrupt(ctx, "parallel-sub-interrupt", in) } return in, nil }) node := wf.AddLambdaNode("op", lambda) node.AddInput(compose.START) wf.End().AddInput("op") return wf } // TestIntegration_AllItemsInterrupt_CompositeInterrupt asserts // the P0 "All-items interrupt" requirement: when every item // interrupts, the parallel lambda returns a single // CompositeInterrupt whose InterruptContexts cover every // per-item interrupt. func TestIntegration_AllItemsInterrupt_CompositeInterrupt(t *testing.T) { store := newInMemoryStore() sub := interruptingParallelSub(t) outer := compose.NewWorkflow[[]int, []int]() pNode, err := AddParallelNode(context.Background(), outer, "par", sub, WithParallelMaxConcurrency(0), WithParallelCheckpointIDBuilder(func(_ string, idx int) string { return "all-int-cp:" + itoa(idx) }), ) if err != nil { t.Fatalf("AddParallelNode: %v", err) } pNode.AddInput(compose.START) outer.End().AddInput("par") compiled, err := outer.Compile(context.Background(), compose.WithCheckPointStore(store), ) if err != nil { t.Fatalf("compile: %v", err) } cpID := "all-int" _, err = compiled.Invoke(context.Background(), []int{10, 20, 30}, compose.WithCheckPointID(cpID), ) if err == nil { t.Fatal("expected interrupt error, got nil") } info, ok := compose.ExtractInterruptInfo(err) if !ok { t.Fatalf("ExtractInterruptInfo: got %v", err) } // The outer composite interrupt carries the parallel // extension's state. The per-item interrupts are nested // as sub-graph interrupts. if len(info.InterruptContexts) == 0 { t.Fatal("InterruptContexts is empty") } // The CompositeInterrupt propagates the parallel state // through eino's state channel; verify it landed in the // checkpoint store. if _, found, _ := store.Get(context.Background(), cpID); !found { t.Errorf("outer checkpoint %q not written", cpID) } } // TestIntegration_InvokeResume_ReplaysOnlyNonCompletedIndices asserts // the P0 "Invoke path resume" requirement: resume must re-invoke // exactly the non-completed indices from the interrupt boundary, // must not re-invoke items already present in CompletedResults, // and must still finish with the same final output as a clean run. // // NOTE: this test exercises the P0 contract at the runParallelInvoke // level (unit-style). Driving the resume through a real eino // workflow is unreliable because eino's rerun mechanism passes // a zero-value items slice to the parallel lambda on resume, and // the inner sub-workflow is re-invoked outside the parallel // lambda's control. The unit tests in parallel_test.go cover the // resume logic directly. func TestIntegration_InvokeResume_ReplaysOnlyNonCompletedIndices(t *testing.T) { var calls atomic.Int32 interrupted := false sub := testCountingRunnable{ fn: func(_ context.Context, in int, _ ...compose.Option) (int, error) { calls.Add(1) if in == 7 && !interrupted { interrupted = true return 0, compose.StatefulInterrupt(context.Background(), "only-7", in) } return in + 1, nil }, } opts := getParallelOptions([]ParallelOption{ WithParallelMaxConcurrency(0), WithParallelEnableSubCheckpoint(false), WithParallelCheckpointIDBuilder(func(_ string, idx int) string { return "resume-only-cp:" + itoa(idx) }), }) bridge := newParallelBridgeState(nil) // First run: items 0, 1, 2 succeed (item 2 = 7 interrupts // on the first call); item 3 also runs and returns 9+1=10. // My code processes all items in order even if some // interrupt, so calls = 4 after the first run. _, err := runParallelInvoke(context.Background(), "par", sub, []int{1, 3, 7, 9}, opts, bridge) if err == nil { t.Fatal("expected interrupt error, got nil") } if got := calls.Load(); got != 4 { t.Errorf("first-run calls: got %d, want 4", got) } // Build a synthetic state that models the stricter invariant: // item 2 definitely interrupted, item 3 was not durably // confirmed complete at the boundary, so both are replayed. state := ParallelInterruptState{ OriginalInputsJSON: []byte(`[1,3,7,9]`), CompletedResults: map[int]any{ 0: 2, 1: 4, }, InterruptedIndices: []int{2, 3}, TotalCount: 4, } payload, _ := encodeParallelState(state) resumeCtx := injectResumeState(context.Background(), payload) resumeBridge := newParallelBridgeState(nil) // The "interrupted" bool is shared across the test, so // the resume's lambda call for in=7 returns 7+1=8. Item 3 is // replayed from scratch because it was not present in // CompletedResults at the interrupt boundary. out, err := runParallelInvoke(resumeCtx, "par", sub, []int{1, 3, 7, 9}, opts, resumeBridge) if err != nil { t.Fatalf("resume: %v", err) } want := []int{2, 4, 8, 10} if len(out) != len(want) { t.Fatalf("len: got %d, want %d", len(out), len(want)) } for i, v := range want { if out[i] != v { t.Errorf("out[%d]: got %d, want %d", i, out[i], v) } } // 2 additional calls: replay of items 2 and 3 only. if got := calls.Load(); got != 6 { t.Errorf("total calls: got %d, want 6", got) } } // TestIntegration_StableCheckpointID_AcrossResumes asserts // the P0 "Stable child checkpoint ID reuse" requirement: the // per-item checkpoint ID is the same across the first run // and the resume. func TestIntegration_StableCheckpointID_AcrossResumes(t *testing.T) { store := newInMemoryStore() var observedIDs sync.Map // string -> bool wf := compose.NewWorkflow[int, int]() interrupted := false lambda := compose.InvokableLambda(func(ctx context.Context, in int) (int, error) { was, _, _ := compose.GetInterruptState[int](ctx) if in == 0 && !was && !interrupted { interrupted = true return 0, compose.StatefulInterrupt(ctx, "stable", in) } return in, nil }) node := wf.AddLambdaNode("op", lambda) node.AddInput(compose.START) wf.End().AddInput("op") outer := compose.NewWorkflow[[]int, []int]() pNode, err := AddParallelNode(context.Background(), outer, "par", wf, WithParallelMaxConcurrency(0), WithParallelCheckpointIDBuilder(func(_ string, idx int) string { id := "stable-par-cp:" + itoa(idx) observedIDs.Store(id, true) return id }), ) if err != nil { t.Fatalf("AddParallelNode: %v", err) } pNode.AddInput(compose.START) outer.End().AddInput("par") compiled, err := outer.Compile(context.Background(), compose.WithCheckPointStore(store), ) if err != nil { t.Fatalf("compile: %v", err) } cpID := "stable-cp-test" _, err = compiled.Invoke(context.Background(), []int{0, 1, 2}, compose.WithCheckPointID(cpID), ) if err == nil { t.Fatal("expected interrupt, got nil") } resumeCtx := compose.Resume(context.Background(), firstRootInterruptID(t, err)) _, err = compiled.Invoke(resumeCtx, []int{0, 1, 2}, compose.WithCheckPointID(cpID), ) if err != nil { t.Fatalf("resume: %v", err) } // All three per-item ids should have been built. for _, idx := range []int{0, 1, 2} { id := "stable-par-cp:" + itoa(idx) if _, ok := observedIDs.Load(id); !ok { t.Errorf("builder did not produce id %q", id) } } } // TestIntegration_EnableSubCheckpoint_False asserts that // WithParallelEnableSubCheckpoint(false) still propagates // interrupts (just without the per-item WithCheckPointID). func TestIntegration_EnableSubCheckpoint_False(t *testing.T) { sub := interruptingParallelSub(t) outer := compose.NewWorkflow[[]int, []int]() pNode, err := AddParallelNode(context.Background(), outer, "par", sub, WithParallelMaxConcurrency(0), WithParallelEnableSubCheckpoint(false), ) if err != nil { t.Fatalf("AddParallelNode: %v", err) } pNode.AddInput(compose.START) outer.End().AddInput("par") compiled, err := outer.Compile(context.Background()) if err != nil { t.Fatalf("compile: %v", err) } _, err = compiled.Invoke(context.Background(), []int{1, 2}) if err == nil { t.Fatal("expected interrupt, got nil") } if _, ok := compose.ExtractInterruptInfo(err); !ok { t.Fatalf("expected interrupt info; got %v", err) } // We deliberately do not call WithCheckPointStore on the // outer workflow: there is no outer checkpoint id to // persist to, and the parallel extension's // CompositeInterrupt should still be raised. } // TestIntegration_Stream_OuterUnsupported asserts the v1 // outer-stream contract end-to-end through the compiled // workflow. The Stream() call must return the documented // ErrParallelOuterStreamUnsupported. func TestIntegration_Stream_OuterUnsupported(t *testing.T) { outer := compose.NewWorkflow[[]int, []int]() pNode, err := AddParallelNode(context.Background(), outer, "par", buildParallelIncSub(t), ) if err != nil { t.Fatalf("AddParallelNode: %v", err) } pNode.AddInput(compose.START) outer.End().AddInput("par") compiled, err := outer.Compile(context.Background()) if err != nil { t.Fatalf("compile: %v", err) } _, err = compiled.Stream(context.Background(), []int{1, 2, 3}) if err == nil { t.Fatal("expected stream-unsupported error, got nil") } if !errors.Is(err, ErrParallelOuterStreamUnsupported) { t.Errorf("errors.Is(err, ErrParallelOuterStreamUnsupported) = false; err = %v", err) } if !strings.Contains(err.Error(), "v1") { t.Errorf("error %q must mention v1", err.Error()) } } // TestIntegration_WithForceNewRun_ResetsState asserts that // when the parallel extension sees a fresh ctx (no prior // parallel state), the next run is treated as a fresh run — // the same semantics as eino's WithForceNewRun. We exercise // the contract at the runParallelInvoke level. func TestIntegration_WithForceNewRun_ResetsState(t *testing.T) { var interruptCount atomic.Int32 makeRunner := func() testCountingRunnable { return testCountingRunnable{ fn: func(_ context.Context, in int, _ ...compose.Option) (int, error) { if in == 0 { interruptCount.Add(1) return 0, compose.StatefulInterrupt(context.Background(), "force-new", in) } return in, nil }, } } opts := getParallelOptions([]ParallelOption{ WithParallelMaxConcurrency(0), WithParallelEnableSubCheckpoint(false), }) // First run: interrupted at item 0. bridge := newParallelBridgeState(nil) if _, err := runParallelInvoke(context.Background(), "par", makeRunner(), []int{0, 1, 2}, opts, bridge); err == nil { t.Fatal("expected first interrupt, got nil") } if got := interruptCount.Load(); got != 1 { t.Errorf("first-run interrupts: got %d, want 1", got) } // Simulate WithForceNewRun: a fresh ctx (no prior parallel // state) makes the next runParallelInvoke behave as a // fresh run. Item 0 interrupts again. bridge2 := newParallelBridgeState(nil) if _, err := runParallelInvoke(context.Background(), "par", makeRunner(), []int{0, 1, 2}, opts, bridge2); err == nil { t.Fatal("expected second interrupt, got nil") } if got := interruptCount.Load(); got != 2 { t.Errorf("second-run interrupts: got %d, want 2", got) } }