Files
ragflow/internal/agent/component/downstream_message_test.go
Zhichang Yu e45659868a feat(agent): ship the Go agent canvas port — eino interrupt/resume + Redis check-pointing (#16035)
Replaces the Python agent canvas runtime with a Go implementation that
runs inside `cmd/server_main`.

The canvas compiles into an eino Workflow that pauses on wait-for-user
via native Interrupt/Resume (no sentinel flag) and resumes from a
Redis-backed CheckPointStore.

All 21 Python agent components and ~35 tools are ported with functional
parity.

Sandbox providers now read their JSON config from the admin-panel
system_settings table with env fallback.

234 files / +35,413 / -6,111. All Go files are gofmt-clean (CI gate
added); drops the v2 DSL E2E step and the gap-analysis plan (both
redundant after the port ships).

## Type of change

- [x] Refactoring
- [x] New feature
- [x] Bug fix

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-17 13:24:03 +08:00

84 lines
2.9 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package component
import (
"context"
"testing"
"time"
)
// TestLLM_Stream_ExposesThinkingAndContentForDownstreamMessage is a
// regression guard for the Downstream Message detect path.
//
// Python's `_invoke_async` returns a `functools.partial` callable
// when it detects a Message component downstream, deferring
// streaming evaluation until the consumer actually pulls from
// the channel. The Go port exposes the streaming surface via the
// goroutine + channel + select pattern; this test pins the
// contract that any LLM `Stream()` consumer (Message component
// or otherwise) can rely on:
//
// 1. The stream emits a chunk with key "content" and key "thinking".
// 2. The stream eventually closes (no leaked goroutines).
// 3. The consumer can read at its own pace — backpressure is bounded
// by the 16-element channel buffer.
//
// The actual "detect Message downstream" decision is a canvas-scheduler
// concern (it would look at the DAG children of an LLM node). That
// introspection lives in `internal/agent/canvas/` rather than this
// component package. For v1, every LLM Stream() is the same shape
// regardless of downstream topology.
func TestLLM_Stream_ExposesThinkingAndContentForDownstreamMessage(t *testing.T) {
stub := &stubInvoker{resp: &ChatInvokeResponse{Content: "streamed answer", Model: "echo"}}
withStubInvoker(t, stub)
c := NewLLMComponent(LLMParam{ModelID: "echo"})
ch, err := c.Stream(context.Background(), map[string]any{"user_prompt": "go"})
if err != nil {
t.Fatalf("Stream: %v", err)
}
// Simulate a slow downstream consumer (the Message component
// template-rendering path) that reads one chunk at a time.
got := []map[string]any{}
deadline := time.After(2 * time.Second)
for {
select {
case chunk, open := <-ch:
if !open {
if len(got) != 2 {
t.Fatalf("expected 2 chunks (content + done), got %d: %+v", len(got), got)
}
if got[0]["content"] != "streamed answer" {
t.Errorf("content=%v, want 'streamed answer'", got[0]["content"])
}
if got[0]["thinking"] != "" {
t.Errorf("thinking=%v, want empty (no think chain in v1)", got[0]["thinking"])
}
if got[1]["done"] != true {
t.Errorf("done=%v, want true", got[1]["done"])
}
return
}
got = append(got, chunk)
case <-deadline:
t.Fatal("Stream did not close within 2s")
}
}
}