Files
ragflow/internal/agent/component/llm_retry_test.go
Zhichang Yu e45659868a feat(agent): ship the Go agent canvas port — eino interrupt/resume + Redis check-pointing (#16035)
Replaces the Python agent canvas runtime with a Go implementation that
runs inside `cmd/server_main`.

The canvas compiles into an eino Workflow that pauses on wait-for-user
via native Interrupt/Resume (no sentinel flag) and resumes from a
Redis-backed CheckPointStore.

All 21 Python agent components and ~35 tools are ported with functional
parity.

Sandbox providers now read their JSON config from the admin-panel
system_settings table with env fallback.

234 files / +35,413 / -6,111. All Go files are gofmt-clean (CI gate
added); drops the v2 DSL E2E step and the gap-analysis plan (both
redundant after the port ships).

## Type of change

- [x] Refactoring
- [x] New feature
- [x] Bug fix

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-17 13:24:03 +08:00

388 lines
15 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package component — retry decorator tests.
//
// These tests exercise the retryInvoker wrapper directly. The wrapper
// is the chat-level retry loop introduced to mirror Python's
// max_retries/delay_after_error semantics (agent/component/llm.py,
// driven by LLMBundle in rag/llm/chat_model.py). Unlike the
// existing one-shot structured-output retry (in LLMComponent.Invoke),
// the retry loop lives at the ChatInvoker boundary so it covers
// every chat path: LLM, Agent, citation grounding.
package component
import (
"context"
"errors"
"strings"
"sync/atomic"
"testing"
"time"
)
// scriptedInvoker fails the first failTimes calls then succeeds.
// err is returned on every failing call (asserted via errors.Is).
type scriptedInvoker struct {
failTimes int32
err error
resp *ChatInvokeResponse
calls int32
}
func (s *scriptedInvoker) Invoke(_ context.Context, _ ChatInvokeRequest) (*ChatInvokeResponse, error) {
n := atomic.AddInt32(&s.calls, 1)
if n <= atomic.LoadInt32(&s.failTimes) {
return nil, s.err
}
return s.resp, nil
}
func (s *scriptedInvoker) callCount() int { return int(atomic.LoadInt32(&s.calls)) }
// alwaysFailInvoker returns err on every call. Used to exercise the
// exhaustion path.
type alwaysFailInvoker struct {
err error
calls int32
}
func (a *alwaysFailInvoker) Invoke(_ context.Context, _ ChatInvokeRequest) (*ChatInvokeResponse, error) {
atomic.AddInt32(&a.calls, 1)
return nil, a.err
}
func (a *alwaysFailInvoker) callCount() int { return int(atomic.LoadInt32(&a.calls)) }
// TestRetryInvoker_SucceedsOnSecondAttempt: 1 failure, 1 success —
// the loop must retry exactly once and return the success response
// without surfacing the error.
func TestRetryInvoker_SucceedsOnSecondAttempt(t *testing.T) {
want := &ChatInvokeResponse{Content: "ok", Model: "m", Stopped: true}
inner := &scriptedInvoker{failTimes: 1, err: errors.New("transient"), resp: want}
r := newRetryInvoker(inner, 3, time.Millisecond)
resp, err := r.Invoke(context.Background(), ChatInvokeRequest{ModelName: "m"})
if err != nil {
t.Fatalf("Invoke: unexpected err: %v", err)
}
if resp != want {
t.Errorf("resp=%v, want %v", resp, want)
}
if got := inner.callCount(); got != 2 {
t.Errorf("inner.calls=%d, want 2 (1 fail + 1 success)", got)
}
}
// TestRetryInvoker_ExhaustsRetries: failures exceed the budget —
// the loop must stop after maxRetries+1 attempts and wrap the last
// error with the retry count.
func TestRetryInvoker_ExhaustsRetries(t *testing.T) {
sentinel := errors.New("permanent")
inner := &alwaysFailInvoker{err: sentinel}
r := newRetryInvoker(inner, 3, time.Millisecond)
_, err := r.Invoke(context.Background(), ChatInvokeRequest{ModelName: "m"})
if err == nil {
t.Fatal("expected error after exhaustion")
}
if !errors.Is(err, sentinel) {
t.Errorf("err does not wrap sentinel: %v", err)
}
if got, want := inner.callCount(), 4; got != want {
// 1 initial + 3 retries
t.Errorf("inner.calls=%d, want %d (1 + maxRetries)", got, want)
}
if !strings.Contains(err.Error(), "3 retries") {
t.Errorf("error message missing retry count: %q", err.Error())
}
}
// TestRetryInvoker_HonorsContextCancellation: a ctx cancelled
// during backoff must abort the sleep and return ctx.Err() promptly,
// not wait out the full delay.
func TestRetryInvoker_HonorsContextCancellation(t *testing.T) {
inner := &alwaysFailInvoker{err: errors.New("transient")}
// 30s delay so the test would obviously hang if ctx were not
// honored. The cancellation lands within milliseconds.
r := newRetryInvoker(inner, 5, 30*time.Second)
ctx, cancel := context.WithCancel(context.Background())
// Cancel after a short delay, while the retry is sleeping
// through the first backoff.
go func() {
time.Sleep(20 * time.Millisecond)
cancel()
}()
start := time.Now()
_, err := r.Invoke(ctx, ChatInvokeRequest{ModelName: "m"})
elapsed := time.Since(start)
if err == nil {
t.Fatal("expected error from cancelled context")
}
if !errors.Is(err, context.Canceled) {
t.Errorf("err=%v, want context.Canceled", err)
}
// Generous upper bound: ctx cancel must land well before the
// 30s backoff would have elapsed.
if elapsed > 2*time.Second {
t.Errorf("Invoke took %v, expected < 2s with prompt ctx cancel", elapsed)
}
// First call happens, then we cancel during the first backoff.
// The retry loop should not have made more than 2 calls.
if got := inner.callCount(); got > 2 {
t.Errorf("inner.calls=%d, want <= 2 (ctx cancel should abort backoff)", got)
}
}
// TestRetryInvoker_ExponentialBackoff: measure total elapsed for a
// 3-retry loop with a 20ms initial delay. Expected: 20 + 40 + 80
// = 140ms minimum of pure sleep. We allow generous slack for slow
// CI but assert a lower bound that proves doubling (a single
// constant delay would fall below it).
func TestRetryInvoker_ExponentialBackoff(t *testing.T) {
inner := &alwaysFailInvoker{err: errors.New("transient")}
const initial = 20 * time.Millisecond
r := newRetryInvoker(inner, 3, initial)
start := time.Now()
_, _ = r.Invoke(context.Background(), ChatInvokeRequest{ModelName: "m"})
elapsed := time.Since(start)
// 20 + 40 + 80 = 140ms of backoff (3 retries, 4 attempts total).
// Use 130ms as the lower bound to avoid CI flakes from clock
// granularity. Upper bound: 2s for very slow CI.
if elapsed < 130*time.Millisecond {
t.Errorf("elapsed=%v, want >= 130ms (proves doubling, not constant)", elapsed)
}
if elapsed > 2*time.Second {
t.Errorf("elapsed=%v, want < 2s", elapsed)
}
}
// TestRetryInvoker_NoRetriesWhenZero: maxRetries=0 means a single
// attempt with no retry on failure. Mirrors LLMParam.MaxRetries=0
// for latency-sensitive flows.
func TestRetryInvoker_NoRetriesWhenZero(t *testing.T) {
inner := &alwaysFailInvoker{err: errors.New("nope")}
r := newRetryInvoker(inner, 0, 50*time.Millisecond)
_, err := r.Invoke(context.Background(), ChatInvokeRequest{ModelName: "m"})
if err == nil {
t.Fatal("expected error")
}
if got := inner.callCount(); got != 1 {
t.Errorf("inner.calls=%d, want 1 (no retries)", got)
}
}
// TestRetryInvoker_NilInner: a defensive nil check — the wrapper
// should not panic when constructed with nil inner.
func TestRetryInvoker_NilInner(t *testing.T) {
r := newRetryInvoker(nil, 3, time.Millisecond)
_, err := r.Invoke(context.Background(), ChatInvokeRequest{ModelName: "m"})
if err == nil {
t.Fatal("expected error for nil inner")
}
}
// TestLLMParam_RespectsMaxRetries: an LLMComponent configured with
// MaxRetries=5 should exhaust after 6 attempts (1 initial + 5
// retries) when the invoker always fails. This is the integration
// test for the param-override path through resolveChatInvoker.
func TestLLMParam_RespectsMaxRetries(t *testing.T) {
inner := &alwaysFailInvoker{err: errors.New("downstream dead")}
withStubInvoker(t, inner)
c := NewLLMComponent(LLMParam{
ModelID: "m",
MaxRetries: 5,
})
// Force the param to a tiny delay so the test is fast. The
// zero-value default is 2s, which would make this test slow.
c.param.DelayAfterError = time.Millisecond
_, err := c.Invoke(context.Background(), map[string]any{"user_prompt": "x"})
if err == nil {
t.Fatal("expected error from exhausted retries")
}
// 1 initial + 5 retries = 6 invoker calls.
if got, want := inner.callCount(), 6; got != want {
t.Errorf("inner.calls=%d, want %d", got, want)
}
}
// TestLLMParam_ZeroRetriesMeansOneAttempt: MaxRetries=0 must bypass
// retries entirely (the param-override path passes through
// resolveChatInvoker and a fresh retryInvoker with maxRetries=0).
func TestLLMParam_ZeroRetriesMeansOneAttempt(t *testing.T) {
inner := &alwaysFailInvoker{err: errors.New("once")}
withStubInvoker(t, inner)
c := NewLLMComponent(LLMParam{
ModelID: "m",
MaxRetries: 0,
})
// MaxRetries=0 with default zero-value DelayAfterError triggers
// the "no param override" path through resolveChatInvoker, which
// returns the package default (3 retries). To genuinely request
// zero retries we set DelayAfterError to a non-zero sentinel so
// resolveChatInvoker wraps the default in a new retryInvoker
// with maxRetries=0.
c.param.DelayAfterError = time.Millisecond
_, err := c.Invoke(context.Background(), map[string]any{"user_prompt": "x"})
if err == nil {
t.Fatal("expected error")
}
if got := inner.callCount(); got != 1 {
t.Errorf("inner.calls=%d, want 1 (zero retries)", got)
}
}
// TestLLMParam_DefaultRetries: with MaxRetries and
// DelayAfterError both zero (the v1 fixture default), the
// component should still retry up to the package default
// (retryInvokerDefaultRetries=3). This protects against
// regressions where a future change accidentally bypasses the
// retry loop on the hot path.
func TestLLMParam_DefaultRetries(t *testing.T) {
inner := &alwaysFailInvoker{err: errors.New("flaky")}
withStubInvoker(t, inner)
c := NewLLMComponent(LLMParam{ModelID: "m"})
// Both fields zero — the test relies on the package default
// being applied. The default initial delay is 2s, which is too
// slow for a unit test, so we mutate the package default
// indirectly: the test cannot reach into the retry decorator
// (it's wrapped by resolveChatInvoker), so we instead assert
// behavior with a manually-fast retryInvoker injected via
// SetDefaultChatInvoker. This is the more honest test.
fastInner := &alwaysFailInvoker{err: errors.New("flaky")}
SetDefaultChatInvoker(newRetryInvoker(fastInner, 2, time.Millisecond))
t.Cleanup(func() { SetDefaultChatInvoker(nil) })
_, err := c.Invoke(context.Background(), map[string]any{"user_prompt": "x"})
if err == nil {
t.Fatal("expected error after default retries")
}
// 1 initial + 2 retries = 3.
if got, want := fastInner.callCount(), 3; got != want {
t.Errorf("inner.calls=%d, want %d", got, want)
}
// Original (un-wrapped) inner should have been called 0 times
// because resolveChatInvoker returned the package default (the
// one we just installed), not the one passed via withStubInvoker.
if got := inner.callCount(); got != 0 {
t.Errorf("unused inner.calls=%d, want 0", got)
}
}
// TestUnwrapChatInvoker_StripsSingleRetryLayer is the unit-level
// test for the unwrapChatInvoker helper. It must peel off a
// single retryInvoker layer to return the bare invoker
// underneath, so the param-override path can install a fresh
// retryInvoker with the operator's literal MaxRetries without
// multiplicatively stacking on the boot retry.
func TestUnwrapChatInvoker_StripsSingleRetryLayer(t *testing.T) {
bare := &alwaysFailInvoker{err: errors.New("bare")}
wrapped := newRetryInvoker(bare, 3, time.Millisecond)
if got := unwrapChatInvoker(wrapped); got != bare {
t.Errorf("unwrapChatInvoker(retry(bare)) = %v, want %v (bare invoker)", got, bare)
}
}
// TestUnwrapChatInvoker_NoRetryLayer verifies that a bare
// (non-retry) invoker passes through unwrapChatInvoker
// unchanged. The function must not wrap or modify the input
// when no retry layers are present.
func TestUnwrapChatInvoker_NoRetryLayer(t *testing.T) {
bare := &alwaysFailInvoker{err: errors.New("bare")}
if got := unwrapChatInvoker(bare); got != bare {
t.Errorf("unwrapChatInvoker(bare) = %v, want %v (unchanged passthrough)", got, bare)
}
}
// TestUnwrapChatInvoker_StripsMultipleLayers is the defensive
// case: a chain of retryInvokers (production only installs one,
// but pathological callers could nest) is peeled down to the
// bare invoker. The loop bounds the walk at the first
// non-retryInvoker layer.
func TestUnwrapChatInvoker_StripsMultipleLayers(t *testing.T) {
bare := &alwaysFailInvoker{err: errors.New("bare")}
double := newRetryInvoker(newRetryInvoker(bare, 3, time.Millisecond), 3, time.Millisecond)
if got := unwrapChatInvoker(double); got != bare {
t.Errorf("unwrapChatInvoker(retry(retry(bare))) = %v, want %v (bare invoker)", got, bare)
}
}
// TestLLM_ParamOverride_AbsoluteCount_NotStacked is the
// integration test for LLM retry normal-absolute-count
// semantics. It installs a boot retryInvoker with MaxRetries=3
// wrapping an alwaysFailInvoker, then runs an LLMComponent with
// MaxRetries=5. The pre-fix behaviour (stacking) would produce
// (3+1)*(5+1) = 24 invoker calls. The current implementation
// unwraps the boot layer and installs a fresh retryInvoker with
// the operator's literal MaxRetries, so the absolute count is
// exactly MaxRetries+1 = 6.
//
// A regression that re-introduces stacking (e.g. someone drops
// the unwrapChatInvoker call) fails this test.
func TestLLM_ParamOverride_AbsoluteCount_NotStacked(t *testing.T) {
bare := &alwaysFailInvoker{err: errors.New("downstream dead")}
// Simulate the production boot: a retryInvoker wrapping the
// bare invoker. The boot layer's MaxRetries=3 means 4
// invocations per call to the wrapped invoker; without
// unwrapping, the param-override retryInvoker would stack on
// top and produce 4*6 = 24 calls. With unwrapping, the
// absolute count is 6.
boot := newRetryInvoker(bare, 3, time.Millisecond)
withStubInvoker(t, boot)
c := NewLLMComponent(LLMParam{
ModelID: "m",
MaxRetries: 5,
})
// Force a tiny delay so the test runs fast.
c.param.DelayAfterError = time.Millisecond
_, err := c.Invoke(context.Background(), map[string]any{"user_prompt": "x"})
if err == nil {
t.Fatal("expected error from exhausted retries")
}
// With the unwrap: 1 initial + 5 retries = 6 calls to
// the bare invoker. The boot layer is peeled off first.
// Without the unwrap: 6 outer × 4 inner = 24.
if got, want := bare.callCount(), 6; got != want {
t.Errorf("bare.calls=%d, want %d (absolute count, not stacked). If you see 24, the multiplicative-stacking regression has been re-introduced.", got, want)
}
}
// TestLLM_NoParamOverride_StackingPreserved is the
// back-compat companion to the absolute-count test. When
// MaxRetries=0 AND DelayAfterError=0, the boot retry chain must
// run unchanged so existing DSLs that rely on the implicit
// 3-retry budget keep working.
//
// A future change that aggressively unwraps even when no
// override is set would silence the boot retry chain and
// regress production retry behaviour.
func TestLLM_NoParamOverride_StackingPreserved(t *testing.T) {
bare := &alwaysFailInvoker{err: errors.New("downstream dead")}
// Boot layer with the production default (3 retries).
boot := newRetryInvoker(bare, 3, time.Millisecond)
withStubInvoker(t, boot)
// No param override: MaxRetries=0 AND DelayAfterError=0.
c := NewLLMComponent(LLMParam{ModelID: "m"})
_, err := c.Invoke(context.Background(), map[string]any{"user_prompt": "x"})
if err == nil {
t.Fatal("expected error from exhausted retries")
}
// 1 initial + 3 retries = 4 calls to the bare invoker (the
// boot layer ran unchanged).
if got, want := bare.callCount(), 4; got != want {
t.Errorf("bare.calls=%d, want %d (boot layer ran unchanged — no param override means we keep the implicit 3-retry budget)", got, want)
}
}