mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
Replaces the Python agent canvas runtime with a Go implementation that runs inside `cmd/server_main`. The canvas compiles into an eino Workflow that pauses on wait-for-user via native Interrupt/Resume (no sentinel flag) and resumes from a Redis-backed CheckPointStore. All 21 Python agent components and ~35 tools are ported with functional parity. Sandbox providers now read their JSON config from the admin-panel system_settings table with env fallback. 234 files / +35,413 / -6,111. All Go files are gofmt-clean (CI gate added); drops the v2 DSL E2E step and the gap-analysis plan (both redundant after the port ships). ## Type of change - [x] Refactoring - [x] New feature - [x] Bug fix 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude <noreply@anthropic.com>
135 lines
4.1 KiB
Go
135 lines
4.1 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
package component
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// TestLLM_Stream_HappyPath: Stream emits content + done chunks, channel
|
|
// closes.
|
|
func TestLLM_Stream_HappyPath(t *testing.T) {
|
|
stub := &stubInvoker{resp: &ChatInvokeResponse{Content: "hello", Model: "echo"}}
|
|
withStubInvoker(t, stub)
|
|
|
|
c := NewLLMComponent(LLMParam{ModelID: "echo"})
|
|
ch, err := c.Stream(context.Background(), map[string]any{"user_prompt": "hi"})
|
|
if err != nil {
|
|
t.Fatalf("Stream: %v", err)
|
|
}
|
|
var got []map[string]any
|
|
for chunk := range ch {
|
|
got = append(got, chunk)
|
|
}
|
|
if len(got) != 2 {
|
|
t.Fatalf("expected 2 chunks (content + done), got %d: %+v", len(got), got)
|
|
}
|
|
// First chunk: content with empty thinking
|
|
if got[0]["content"] != "hello" {
|
|
t.Errorf("chunk[0].content=%v, want hello", got[0]["content"])
|
|
}
|
|
if got[0]["thinking"] != "" {
|
|
t.Errorf("chunk[0].thinking=%v, want empty", got[0]["thinking"])
|
|
}
|
|
// Second chunk: done
|
|
if got[1]["done"] != true {
|
|
t.Errorf("chunk[1].done=%v, want true", got[1]["done"])
|
|
}
|
|
if got[1]["model"] != "echo" {
|
|
t.Errorf("chunk[1].model=%v, want echo", got[1]["model"])
|
|
}
|
|
}
|
|
|
|
// TestLLM_Stream_Error: when Invoke returns an error, Stream emits an
|
|
// error chunk and closes.
|
|
func TestLLM_Stream_Error(t *testing.T) {
|
|
stub := &stubInvoker{err: context.DeadlineExceeded}
|
|
withStubInvoker(t, stub)
|
|
|
|
c := NewLLMComponent(LLMParam{ModelID: "echo"})
|
|
ch, err := c.Stream(context.Background(), map[string]any{"user_prompt": "hi"})
|
|
if err != nil {
|
|
t.Fatalf("Stream: %v", err)
|
|
}
|
|
var got []map[string]any
|
|
for chunk := range ch {
|
|
got = append(got, chunk)
|
|
}
|
|
if len(got) != 1 {
|
|
t.Fatalf("expected 1 error chunk, got %d: %+v", len(got), got)
|
|
}
|
|
if _, ok := got[0]["error"]; !ok {
|
|
t.Errorf("error chunk missing 'error' key; got: %+v", got[0])
|
|
}
|
|
}
|
|
|
|
// TestLLM_Stream_RespectsCancellation: pre-cancelled context causes
|
|
// Stream to exit before producing chunks.
|
|
func TestLLM_Stream_RespectsCancellation(t *testing.T) {
|
|
stub := &stubInvoker{resp: &ChatInvokeResponse{Content: "hi", Model: "echo"}}
|
|
withStubInvoker(t, stub)
|
|
|
|
c := NewLLMComponent(LLMParam{ModelID: "echo"})
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel() // pre-cancel
|
|
|
|
ch, err := c.Stream(ctx, map[string]any{"user_prompt": "hi"})
|
|
if err != nil {
|
|
t.Fatalf("Stream: %v", err)
|
|
}
|
|
var got []map[string]any
|
|
for chunk := range ch {
|
|
got = append(got, chunk)
|
|
}
|
|
// No chunks should be received (cancel was already triggered).
|
|
if len(got) != 0 {
|
|
t.Errorf("expected 0 chunks after pre-cancel, got %d: %+v", len(got), got)
|
|
}
|
|
}
|
|
|
|
// TestLLM_Stream_BufferDoesNotBlock: the channel buffer is large
|
|
// enough that the producer doesn't block on a slow consumer for the
|
|
// small N of chunks this v1 emits. (A real streaming integration
|
|
// would need to handle backpressure more carefully; for v1 the
|
|
// buffer of 16 is way more than the 1-2 chunks per call.)
|
|
func TestLLM_Stream_BufferDoesNotBlock(t *testing.T) {
|
|
stub := &stubInvoker{resp: &ChatInvokeResponse{Content: "ok", Model: "echo"}}
|
|
withStubInvoker(t, stub)
|
|
|
|
c := NewLLMComponent(LLMParam{ModelID: "echo"})
|
|
ch, err := c.Stream(context.Background(), map[string]any{"user_prompt": "hi"})
|
|
if err != nil {
|
|
t.Fatalf("Stream: %v", err)
|
|
}
|
|
// Drain with a timeout so the test fails loudly if the producer
|
|
// deadlocks. Each chunk should arrive within milliseconds.
|
|
done := make(chan struct{})
|
|
go func() {
|
|
for range ch {
|
|
}
|
|
close(done)
|
|
}()
|
|
select {
|
|
case <-done:
|
|
// OK
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("Stream did not drain within 2s — likely deadlock")
|
|
}
|
|
}
|