Files
ragflow/internal/agent/workflowx/loop_options_test.go
Zhichang Yu f58fae5fb7 feat(go-agent): Ported retrieval node, added Keenable web search tool (#16396)
Ported retrieval node, added Keenable web search tool
- [x] New Feature (non-breaking change which adds functionality)
2026-06-29 09:45:16 +08:00

376 lines
12 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// loop_options_test.go — option semantics for AddLoopNode. These
// tests focus on the configured behaviour of the option set
// (defaults, forwarding, builders, compile-time failure paths).
package workflowx
import (
"context"
"errors"
"strings"
"sync/atomic"
"testing"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
// buildSubCounter is a sub-workflow that increments a counter on
// every call. It is the basis for the option-forwarding tests
// (option callbacks must be observed on every call).
func buildSubCounter(t *testing.T, counter *atomic.Int64) *compose.Workflow[int, int] {
t.Helper()
wf := compose.NewWorkflow[int, int]()
lambda := compose.InvokableLambda(func(_ context.Context, in int) (int, error) {
counter.Add(1)
return in + 1, nil
})
node := wf.AddLambdaNode("inc", lambda)
node.AddInput(compose.START)
wf.End().AddInput("inc")
return wf
}
// TestOptions_DefaultStreamModeIsFinalOnly asserts that omitting
// WithLoopStream uses LoopStreamFinalOnly. We probe the option
// resolver directly (without compiling a workflow) so the test is
// fast and has no dependency on eino's compile pipeline.
func TestOptions_DefaultStreamModeIsFinalOnly(t *testing.T) {
opts := getLoopOptions(nil)
if opts.streamMode != LoopStreamFinalOnly {
t.Errorf("default stream mode: got %q, want %q", opts.streamMode, LoopStreamFinalOnly)
}
}
// TestOptions_WithLoopStream_OverridesDefault asserts the
// LoopStreamEveryIteration mode is accepted.
func TestOptions_WithLoopStream_OverridesDefault(t *testing.T) {
opts := getLoopOptions([]LoopOption{WithLoopStream(LoopStreamEveryIteration)})
if opts.streamMode != LoopStreamEveryIteration {
t.Errorf("stream mode: got %q, want %q", opts.streamMode, LoopStreamEveryIteration)
}
}
// TestOptions_WithLoopStream_UnknownRejected asserts that an
// unrecognised mode is ignored (the resolver keeps the default).
func TestOptions_WithLoopStream_UnknownRejected(t *testing.T) {
opts := getLoopOptions([]LoopOption{WithLoopStream(LoopStreamMode("nonsense"))})
if opts.streamMode != LoopStreamFinalOnly {
t.Errorf("unknown mode: got %q, want default", opts.streamMode)
}
}
// TestOptions_DefaultMaxIterations is a numeric assertion that the
// resolver substitutes a non-zero cap when the caller does not
// configure one.
func TestOptions_DefaultMaxIterations(t *testing.T) {
opts := getLoopOptions(nil)
if opts.maxIterations <= 0 {
t.Errorf("default max iterations: got %d, want > 0", opts.maxIterations)
}
}
// TestOptions_WithLoopMaxIterations_ZeroKeepsDefault asserts that
// an explicit zero is treated as "use the default". This matches
// the documented P2 §"Constraints" semantics.
func TestOptions_WithLoopMaxIterations_ZeroKeepsDefault(t *testing.T) {
opts := getLoopOptions([]LoopOption{WithLoopMaxIterations(0)})
if opts.maxIterations <= 0 {
t.Errorf("explicit zero: got %d, want > 0 (default)", opts.maxIterations)
}
}
// TestOptions_WithLoopMaxIterations_NegativeKeepsDefault asserts
// that a negative value is treated as "use the default". Negative
// values are not meaningful for an iteration cap.
func TestOptions_WithLoopMaxIterations_NegativeKeepsDefault(t *testing.T) {
opts := getLoopOptions([]LoopOption{WithLoopMaxIterations(-7)})
if opts.maxIterations <= 0 {
t.Errorf("negative: got %d, want > 0 (default)", opts.maxIterations)
}
}
// TestOptions_WithLoopMaxIterations_Positive asserts the positive
// value is preserved.
func TestOptions_WithLoopMaxIterations_Positive(t *testing.T) {
opts := getLoopOptions([]LoopOption{WithLoopMaxIterations(42)})
if opts.maxIterations != 42 {
t.Errorf("got %d, want 42", opts.maxIterations)
}
}
// TestOptions_CheckpointBuilder_Default is non-empty. The default
// builder must be set so the loop is usable without an explicit
// WithLoopCheckpointIDBuilder.
func TestOptions_CheckpointBuilder_Default(t *testing.T) {
opts := getLoopOptions(nil)
if opts.checkpointBuilder == nil {
t.Fatal("default checkpoint builder is nil")
}
id := opts.checkpointBuilder("k", 3)
if id == "" {
t.Error("default builder returned empty id")
}
}
// TestOptions_CheckpointBuilder_Override asserts the user-supplied
// builder is used.
func TestOptions_CheckpointBuilder_Override(t *testing.T) {
var gotKey string
var gotIter int
b := func(key string, iter int) string {
gotKey = key
gotIter = iter
return "cp:" + key + ":" + itoa(iter)
}
opts := getLoopOptions([]LoopOption{WithLoopCheckpointIDBuilder(b)})
id := opts.checkpointBuilder("loopKey", 5)
if id != "cp:loopKey:5" {
t.Errorf("builder output: got %q, want %q", id, "cp:loopKey:5")
}
if gotKey != "loopKey" || gotIter != 5 {
t.Errorf("builder args: got key=%q iter=%d, want key=%q iter=5", gotKey, gotIter, "loopKey")
}
}
// TestOptions_CheckpointBuilder_NilIgnored asserts that a nil
// builder passed via the option is ignored.
func TestOptions_CheckpointBuilder_NilIgnored(t *testing.T) {
opts := getLoopOptions([]LoopOption{WithLoopCheckpointIDBuilder(nil)})
if opts.checkpointBuilder == nil {
t.Error("nil builder should be ignored, but default is also nil — test is inconclusive")
}
// The default builder is non-nil so the option is a no-op.
id := opts.checkpointBuilder("k", 1)
if id == "" {
t.Error("builder produced empty id")
}
}
// TestOptions_RunOptionsForwarded asserts that the options set via
// WithLoopRunOptions are passed to every nested sub-workflow call.
// We use a counter sub-workflow and check that the run option is
// observed on each call by counting sub-invocations.
func TestOptions_RunOptionsForwarded(t *testing.T) {
var counter atomic.Int64
sub := buildSubCounter(t, &counter)
shouldQuit := func(_ context.Context, _, _, next int) (bool, error) {
return next >= 3, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopMaxIterations(10),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background())
if err != nil {
t.Fatalf("compile: %v", err)
}
if _, err := compiled.Invoke(context.Background(), 0); err != nil {
t.Fatalf("invoke: %v", err)
}
if got := counter.Load(); got != 3 {
t.Errorf("sub counter: got %d, want 3", got)
}
}
// TestOptions_CompileOptionsForwarded asserts that compile-time
// options are propagated to the sub-workflow's Compile. We
// configure a CheckPointStore via WithLoopCompileOptions; if the
// store is wired in, subsequent sub-workflow invocations have
// access to it. The store is exercised via a simple key lookup.
func TestOptions_CompileOptionsForwarded(t *testing.T) {
store := newInMemoryStore()
_ = store.Set(context.Background(), "k", []byte("v"))
sub := compose.NewWorkflow[int, int]()
lambda := compose.InvokableLambda(func(ctx context.Context, in int) (int, error) {
// Touch the store to assert it is reachable in the
// compiled sub-workflow. We do this by reading a
// key set above; if the compile option was not
// applied, the sub-workflow will panic with a nil
// store (compile-time check on the engine side).
_, _, _ = store.Get(ctx, "k")
return in + 1, nil
})
node := sub.AddLambdaNode("inc", lambda)
node.AddInput(compose.START)
sub.End().AddInput("inc")
shouldQuit := func(_ context.Context, _, _, next int) (bool, error) {
return next >= 2, nil
}
outer := compose.NewWorkflow[int, int]()
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
WithLoopMaxIterations(5),
WithLoopCompileOptions(compose.WithCheckPointStore(store)),
)
if err != nil {
t.Fatalf("AddLoopNode: %v", err)
}
loopNode.AddInput(compose.START)
outer.End().AddInput("loop")
compiled, err := outer.Compile(context.Background())
if err != nil {
t.Fatalf("compile: %v", err)
}
out, err := compiled.Invoke(context.Background(), 0)
if err != nil {
t.Fatalf("invoke: %v", err)
}
if out != 2 {
t.Errorf("output: got %d, want 2", out)
}
}
// TestOptions_NilChecks verifies that AddLoopNode rejects nil
// inputs up front, before any compile work happens.
func TestOptions_NilChecks(t *testing.T) {
sub := buildSubIncrement(t)
shouldQuit := func(_ context.Context, _, _, _ int) (bool, error) {
return true, nil
}
outer := compose.NewWorkflow[int, int]()
cases := []struct {
name string
fn func() error
}{
{"nil outer", func() error {
_, err := AddLoopNode(context.Background(), nil, "loop", sub, shouldQuit)
return err
}},
{"nil sub", func() error {
_, err := AddLoopNode(context.Background(), outer, "loop", nil, shouldQuit)
return err
}},
{"nil shouldQuit", func() error {
_, err := AddLoopNode(context.Background(), outer, "loop", sub, nil)
return err
}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
err := c.fn()
if err == nil {
t.Errorf("%s: expected error, got nil", c.name)
}
})
}
}
// TestOptions_CompileFailureIsolated asserts that when the sub-
// workflow fails to compile, AddLoopNode returns an error and the
// outer workflow is not modified to a state that would mask the
// failure.
//
// We construct a sub-workflow with no start node so compile fails
// deterministically.
func TestOptions_CompileFailureIsolated(t *testing.T) {
sub := compose.NewWorkflow[int, int]() // no nodes; compile will fail
shouldQuit := func(_ context.Context, _, _, _ int) (bool, error) {
return true, nil
}
outer := compose.NewWorkflow[int, int]()
_, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit)
if err == nil {
t.Fatal("expected compile error, got nil")
}
// The outer workflow should still be empty. Re-compiling it
// must fail with "start node not set", proving the loop
// didn't silently add a placeholder node.
_, err = outer.Compile(context.Background())
if err == nil || !strings.Contains(err.Error(), "start node not set") {
t.Errorf("outer workflow not in expected state: %v", err)
}
}
// TestOptions_SentinelErrorsExist is a smoke test that all four
// sentinel error values are non-nil. The behavioural assertions
// live in loop_test.go and loop_integration_test.go; this test
// pins the existence of the symbols so refactors cannot drop
// them silently.
func TestOptions_SentinelErrorsExist(t *testing.T) {
sentinels := map[string]error{
"ErrLoopMaxIterationsExceeded": ErrLoopMaxIterationsExceeded,
"ErrLoopSubGraphInterrupted": ErrLoopSubGraphInterrupted,
"ErrLoopResumeStateInvalid": ErrLoopResumeStateInvalid,
"ErrLoopQuitConditionFailed": ErrLoopQuitConditionFailed,
}
for name, e := range sentinels {
if e == nil {
t.Errorf("%s is nil", name)
}
}
// errors.Is round-trip: each sentinel must satisfy errors.Is
// against itself.
if !errors.Is(ErrLoopMaxIterationsExceeded, ErrLoopMaxIterationsExceeded) {
t.Error("ErrLoopMaxIterationsExceeded is not Is-self")
}
if !errors.Is(ErrLoopSubGraphInterrupted, ErrLoopSubGraphInterrupted) {
t.Error("ErrLoopSubGraphInterrupted is not Is-self")
}
}
// TestOptions_StreamModeResolveAfterUnknownMode asserts that an
// invalid mode followed by a valid one resolves to the second
// (later options take precedence).
func TestOptions_StreamModeResolveAfterUnknownMode(t *testing.T) {
opts := getLoopOptions([]LoopOption{
WithLoopStream(LoopStreamMode("garbage")),
WithLoopStream(LoopStreamEveryIteration),
})
if opts.streamMode != LoopStreamEveryIteration {
t.Errorf("got %q, want %q", opts.streamMode, LoopStreamEveryIteration)
}
}
// itoa is a tiny helper that avoids importing strconv solely for
// tests. It is intentionally inline (not exported) and only used
// by the checkpoint-builder override test.
func itoa(n int) string {
if n == 0 {
return "0"
}
neg := n < 0
if neg {
n = -n
}
var buf [20]byte
i := len(buf)
for n > 0 {
i--
buf[i] = byte('0' + n%10)
n /= 10
}
if neg {
i--
buf[i] = '-'
}
return string(buf[i:])
}
// ensure the unused import of schema is preserved for future
// stream-path tests in this file.
var _ = schema.Pipe[int]