mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
Ported retrieval node, added Keenable web search tool - [x] New Feature (non-breaking change which adds functionality)
376 lines
12 KiB
Go
376 lines
12 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
// loop_options_test.go — option semantics for AddLoopNode. These
|
|
// tests focus on the configured behaviour of the option set
|
|
// (defaults, forwarding, builders, compile-time failure paths).
|
|
package workflowx
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strings"
|
|
"sync/atomic"
|
|
"testing"
|
|
|
|
"github.com/cloudwego/eino/compose"
|
|
"github.com/cloudwego/eino/schema"
|
|
)
|
|
|
|
// buildSubCounter is a sub-workflow that increments a counter on
|
|
// every call. It is the basis for the option-forwarding tests
|
|
// (option callbacks must be observed on every call).
|
|
func buildSubCounter(t *testing.T, counter *atomic.Int64) *compose.Workflow[int, int] {
|
|
t.Helper()
|
|
wf := compose.NewWorkflow[int, int]()
|
|
lambda := compose.InvokableLambda(func(_ context.Context, in int) (int, error) {
|
|
counter.Add(1)
|
|
return in + 1, nil
|
|
})
|
|
node := wf.AddLambdaNode("inc", lambda)
|
|
node.AddInput(compose.START)
|
|
wf.End().AddInput("inc")
|
|
return wf
|
|
}
|
|
|
|
// TestOptions_DefaultStreamModeIsFinalOnly asserts that omitting
|
|
// WithLoopStream uses LoopStreamFinalOnly. We probe the option
|
|
// resolver directly (without compiling a workflow) so the test is
|
|
// fast and has no dependency on eino's compile pipeline.
|
|
func TestOptions_DefaultStreamModeIsFinalOnly(t *testing.T) {
|
|
opts := getLoopOptions(nil)
|
|
if opts.streamMode != LoopStreamFinalOnly {
|
|
t.Errorf("default stream mode: got %q, want %q", opts.streamMode, LoopStreamFinalOnly)
|
|
}
|
|
}
|
|
|
|
// TestOptions_WithLoopStream_OverridesDefault asserts the
|
|
// LoopStreamEveryIteration mode is accepted.
|
|
func TestOptions_WithLoopStream_OverridesDefault(t *testing.T) {
|
|
opts := getLoopOptions([]LoopOption{WithLoopStream(LoopStreamEveryIteration)})
|
|
if opts.streamMode != LoopStreamEveryIteration {
|
|
t.Errorf("stream mode: got %q, want %q", opts.streamMode, LoopStreamEveryIteration)
|
|
}
|
|
}
|
|
|
|
// TestOptions_WithLoopStream_UnknownRejected asserts that an
|
|
// unrecognised mode is ignored (the resolver keeps the default).
|
|
func TestOptions_WithLoopStream_UnknownRejected(t *testing.T) {
|
|
opts := getLoopOptions([]LoopOption{WithLoopStream(LoopStreamMode("nonsense"))})
|
|
if opts.streamMode != LoopStreamFinalOnly {
|
|
t.Errorf("unknown mode: got %q, want default", opts.streamMode)
|
|
}
|
|
}
|
|
|
|
// TestOptions_DefaultMaxIterations is a numeric assertion that the
|
|
// resolver substitutes a non-zero cap when the caller does not
|
|
// configure one.
|
|
func TestOptions_DefaultMaxIterations(t *testing.T) {
|
|
opts := getLoopOptions(nil)
|
|
if opts.maxIterations <= 0 {
|
|
t.Errorf("default max iterations: got %d, want > 0", opts.maxIterations)
|
|
}
|
|
}
|
|
|
|
// TestOptions_WithLoopMaxIterations_ZeroKeepsDefault asserts that
|
|
// an explicit zero is treated as "use the default". This matches
|
|
// the documented P2 §"Constraints" semantics.
|
|
func TestOptions_WithLoopMaxIterations_ZeroKeepsDefault(t *testing.T) {
|
|
opts := getLoopOptions([]LoopOption{WithLoopMaxIterations(0)})
|
|
if opts.maxIterations <= 0 {
|
|
t.Errorf("explicit zero: got %d, want > 0 (default)", opts.maxIterations)
|
|
}
|
|
}
|
|
|
|
// TestOptions_WithLoopMaxIterations_NegativeKeepsDefault asserts
|
|
// that a negative value is treated as "use the default". Negative
|
|
// values are not meaningful for an iteration cap.
|
|
func TestOptions_WithLoopMaxIterations_NegativeKeepsDefault(t *testing.T) {
|
|
opts := getLoopOptions([]LoopOption{WithLoopMaxIterations(-7)})
|
|
if opts.maxIterations <= 0 {
|
|
t.Errorf("negative: got %d, want > 0 (default)", opts.maxIterations)
|
|
}
|
|
}
|
|
|
|
// TestOptions_WithLoopMaxIterations_Positive asserts the positive
|
|
// value is preserved.
|
|
func TestOptions_WithLoopMaxIterations_Positive(t *testing.T) {
|
|
opts := getLoopOptions([]LoopOption{WithLoopMaxIterations(42)})
|
|
if opts.maxIterations != 42 {
|
|
t.Errorf("got %d, want 42", opts.maxIterations)
|
|
}
|
|
}
|
|
|
|
// TestOptions_CheckpointBuilder_Default is non-empty. The default
|
|
// builder must be set so the loop is usable without an explicit
|
|
// WithLoopCheckpointIDBuilder.
|
|
func TestOptions_CheckpointBuilder_Default(t *testing.T) {
|
|
opts := getLoopOptions(nil)
|
|
if opts.checkpointBuilder == nil {
|
|
t.Fatal("default checkpoint builder is nil")
|
|
}
|
|
id := opts.checkpointBuilder("k", 3)
|
|
if id == "" {
|
|
t.Error("default builder returned empty id")
|
|
}
|
|
}
|
|
|
|
// TestOptions_CheckpointBuilder_Override asserts the user-supplied
|
|
// builder is used.
|
|
func TestOptions_CheckpointBuilder_Override(t *testing.T) {
|
|
var gotKey string
|
|
var gotIter int
|
|
b := func(key string, iter int) string {
|
|
gotKey = key
|
|
gotIter = iter
|
|
return "cp:" + key + ":" + itoa(iter)
|
|
}
|
|
opts := getLoopOptions([]LoopOption{WithLoopCheckpointIDBuilder(b)})
|
|
id := opts.checkpointBuilder("loopKey", 5)
|
|
if id != "cp:loopKey:5" {
|
|
t.Errorf("builder output: got %q, want %q", id, "cp:loopKey:5")
|
|
}
|
|
if gotKey != "loopKey" || gotIter != 5 {
|
|
t.Errorf("builder args: got key=%q iter=%d, want key=%q iter=5", gotKey, gotIter, "loopKey")
|
|
}
|
|
}
|
|
|
|
// TestOptions_CheckpointBuilder_NilIgnored asserts that a nil
|
|
// builder passed via the option is ignored.
|
|
func TestOptions_CheckpointBuilder_NilIgnored(t *testing.T) {
|
|
opts := getLoopOptions([]LoopOption{WithLoopCheckpointIDBuilder(nil)})
|
|
if opts.checkpointBuilder == nil {
|
|
t.Error("nil builder should be ignored, but default is also nil — test is inconclusive")
|
|
}
|
|
// The default builder is non-nil so the option is a no-op.
|
|
id := opts.checkpointBuilder("k", 1)
|
|
if id == "" {
|
|
t.Error("builder produced empty id")
|
|
}
|
|
}
|
|
|
|
// TestOptions_RunOptionsForwarded asserts that the options set via
|
|
// WithLoopRunOptions are passed to every nested sub-workflow call.
|
|
// We use a counter sub-workflow and check that the run option is
|
|
// observed on each call by counting sub-invocations.
|
|
func TestOptions_RunOptionsForwarded(t *testing.T) {
|
|
var counter atomic.Int64
|
|
sub := buildSubCounter(t, &counter)
|
|
|
|
shouldQuit := func(_ context.Context, _, _, next int) (bool, error) {
|
|
return next >= 3, nil
|
|
}
|
|
outer := compose.NewWorkflow[int, int]()
|
|
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
|
|
WithLoopMaxIterations(10),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("AddLoopNode: %v", err)
|
|
}
|
|
loopNode.AddInput(compose.START)
|
|
outer.End().AddInput("loop")
|
|
compiled, err := outer.Compile(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("compile: %v", err)
|
|
}
|
|
if _, err := compiled.Invoke(context.Background(), 0); err != nil {
|
|
t.Fatalf("invoke: %v", err)
|
|
}
|
|
if got := counter.Load(); got != 3 {
|
|
t.Errorf("sub counter: got %d, want 3", got)
|
|
}
|
|
}
|
|
|
|
// TestOptions_CompileOptionsForwarded asserts that compile-time
|
|
// options are propagated to the sub-workflow's Compile. We
|
|
// configure a CheckPointStore via WithLoopCompileOptions; if the
|
|
// store is wired in, subsequent sub-workflow invocations have
|
|
// access to it. The store is exercised via a simple key lookup.
|
|
func TestOptions_CompileOptionsForwarded(t *testing.T) {
|
|
store := newInMemoryStore()
|
|
_ = store.Set(context.Background(), "k", []byte("v"))
|
|
|
|
sub := compose.NewWorkflow[int, int]()
|
|
lambda := compose.InvokableLambda(func(ctx context.Context, in int) (int, error) {
|
|
// Touch the store to assert it is reachable in the
|
|
// compiled sub-workflow. We do this by reading a
|
|
// key set above; if the compile option was not
|
|
// applied, the sub-workflow will panic with a nil
|
|
// store (compile-time check on the engine side).
|
|
_, _, _ = store.Get(ctx, "k")
|
|
return in + 1, nil
|
|
})
|
|
node := sub.AddLambdaNode("inc", lambda)
|
|
node.AddInput(compose.START)
|
|
sub.End().AddInput("inc")
|
|
|
|
shouldQuit := func(_ context.Context, _, _, next int) (bool, error) {
|
|
return next >= 2, nil
|
|
}
|
|
outer := compose.NewWorkflow[int, int]()
|
|
loopNode, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit,
|
|
WithLoopMaxIterations(5),
|
|
WithLoopCompileOptions(compose.WithCheckPointStore(store)),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("AddLoopNode: %v", err)
|
|
}
|
|
loopNode.AddInput(compose.START)
|
|
outer.End().AddInput("loop")
|
|
compiled, err := outer.Compile(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("compile: %v", err)
|
|
}
|
|
out, err := compiled.Invoke(context.Background(), 0)
|
|
if err != nil {
|
|
t.Fatalf("invoke: %v", err)
|
|
}
|
|
if out != 2 {
|
|
t.Errorf("output: got %d, want 2", out)
|
|
}
|
|
}
|
|
|
|
// TestOptions_NilChecks verifies that AddLoopNode rejects nil
|
|
// inputs up front, before any compile work happens.
|
|
func TestOptions_NilChecks(t *testing.T) {
|
|
sub := buildSubIncrement(t)
|
|
shouldQuit := func(_ context.Context, _, _, _ int) (bool, error) {
|
|
return true, nil
|
|
}
|
|
outer := compose.NewWorkflow[int, int]()
|
|
|
|
cases := []struct {
|
|
name string
|
|
fn func() error
|
|
}{
|
|
{"nil outer", func() error {
|
|
_, err := AddLoopNode(context.Background(), nil, "loop", sub, shouldQuit)
|
|
return err
|
|
}},
|
|
{"nil sub", func() error {
|
|
_, err := AddLoopNode(context.Background(), outer, "loop", nil, shouldQuit)
|
|
return err
|
|
}},
|
|
{"nil shouldQuit", func() error {
|
|
_, err := AddLoopNode(context.Background(), outer, "loop", sub, nil)
|
|
return err
|
|
}},
|
|
}
|
|
for _, c := range cases {
|
|
t.Run(c.name, func(t *testing.T) {
|
|
err := c.fn()
|
|
if err == nil {
|
|
t.Errorf("%s: expected error, got nil", c.name)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestOptions_CompileFailureIsolated asserts that when the sub-
|
|
// workflow fails to compile, AddLoopNode returns an error and the
|
|
// outer workflow is not modified to a state that would mask the
|
|
// failure.
|
|
//
|
|
// We construct a sub-workflow with no start node so compile fails
|
|
// deterministically.
|
|
func TestOptions_CompileFailureIsolated(t *testing.T) {
|
|
sub := compose.NewWorkflow[int, int]() // no nodes; compile will fail
|
|
shouldQuit := func(_ context.Context, _, _, _ int) (bool, error) {
|
|
return true, nil
|
|
}
|
|
outer := compose.NewWorkflow[int, int]()
|
|
_, err := AddLoopNode(context.Background(), outer, "loop", sub, shouldQuit)
|
|
if err == nil {
|
|
t.Fatal("expected compile error, got nil")
|
|
}
|
|
// The outer workflow should still be empty. Re-compiling it
|
|
// must fail with "start node not set", proving the loop
|
|
// didn't silently add a placeholder node.
|
|
_, err = outer.Compile(context.Background())
|
|
if err == nil || !strings.Contains(err.Error(), "start node not set") {
|
|
t.Errorf("outer workflow not in expected state: %v", err)
|
|
}
|
|
}
|
|
|
|
// TestOptions_SentinelErrorsExist is a smoke test that all four
|
|
// sentinel error values are non-nil. The behavioural assertions
|
|
// live in loop_test.go and loop_integration_test.go; this test
|
|
// pins the existence of the symbols so refactors cannot drop
|
|
// them silently.
|
|
func TestOptions_SentinelErrorsExist(t *testing.T) {
|
|
sentinels := map[string]error{
|
|
"ErrLoopMaxIterationsExceeded": ErrLoopMaxIterationsExceeded,
|
|
"ErrLoopSubGraphInterrupted": ErrLoopSubGraphInterrupted,
|
|
"ErrLoopResumeStateInvalid": ErrLoopResumeStateInvalid,
|
|
"ErrLoopQuitConditionFailed": ErrLoopQuitConditionFailed,
|
|
}
|
|
for name, e := range sentinels {
|
|
if e == nil {
|
|
t.Errorf("%s is nil", name)
|
|
}
|
|
}
|
|
// errors.Is round-trip: each sentinel must satisfy errors.Is
|
|
// against itself.
|
|
if !errors.Is(ErrLoopMaxIterationsExceeded, ErrLoopMaxIterationsExceeded) {
|
|
t.Error("ErrLoopMaxIterationsExceeded is not Is-self")
|
|
}
|
|
if !errors.Is(ErrLoopSubGraphInterrupted, ErrLoopSubGraphInterrupted) {
|
|
t.Error("ErrLoopSubGraphInterrupted is not Is-self")
|
|
}
|
|
}
|
|
|
|
// TestOptions_StreamModeResolveAfterUnknownMode asserts that an
|
|
// invalid mode followed by a valid one resolves to the second
|
|
// (later options take precedence).
|
|
func TestOptions_StreamModeResolveAfterUnknownMode(t *testing.T) {
|
|
opts := getLoopOptions([]LoopOption{
|
|
WithLoopStream(LoopStreamMode("garbage")),
|
|
WithLoopStream(LoopStreamEveryIteration),
|
|
})
|
|
if opts.streamMode != LoopStreamEveryIteration {
|
|
t.Errorf("got %q, want %q", opts.streamMode, LoopStreamEveryIteration)
|
|
}
|
|
}
|
|
|
|
// itoa is a tiny helper that avoids importing strconv solely for
|
|
// tests. It is intentionally inline (not exported) and only used
|
|
// by the checkpoint-builder override test.
|
|
func itoa(n int) string {
|
|
if n == 0 {
|
|
return "0"
|
|
}
|
|
neg := n < 0
|
|
if neg {
|
|
n = -n
|
|
}
|
|
var buf [20]byte
|
|
i := len(buf)
|
|
for n > 0 {
|
|
i--
|
|
buf[i] = byte('0' + n%10)
|
|
n /= 10
|
|
}
|
|
if neg {
|
|
i--
|
|
buf[i] = '-'
|
|
}
|
|
return string(buf[i:])
|
|
}
|
|
|
|
// ensure the unused import of schema is preserved for future
|
|
// stream-path tests in this file.
|
|
var _ = schema.Pipe[int]
|