mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
fix(upstage): extract reasoning delta from streaming responses (#14817)
### What problem does this PR solve? `UpstageModel.ChatStreamlyWithSender` (in the driver merged via #14819) only extracted `delta.content` from each SSE event. For the `solar-pro3` reasoning family (and any future Upstage model that follows the same wire shape), the chain-of-thought is streamed in a **separate `delta.reasoning` field**, and the driver was silently dropping all of it. The non-streaming path already extracts `message.reasoning` into `ChatResponse.ReasonContent` (added earlier in this PR's history), so the same model produced **inconsistent behavior** between streaming and non-streaming: a tenant calling `solar-pro3` with `reasoning_effort: high` would see the reasoning trace if they used `ChatWithMessages` but not if they used `ChatStreamlyWithSender`. ### Live evidence Probed against `api.upstage.ai/v1/chat/completions` with `solar-pro3` + `reasoning_effort: high` + `stream: true` (8000-token budget so the reasoning has room to finish): ``` $ curl -sN -H "Authorization: Bearer <key>" -H "Content-Type: application/json" \ -X POST https://api.upstage.ai/v1/chat/completions \ -d '{"model":"solar-pro3","messages":[{"role":"user","content":"Compute 15% of 80."}], "max_tokens":8000,"stream":true,"reasoning_effort":"high"}' # across 168 SSE events: # delta keys seen: [content reasoning role] # delta.content total len: 121 chars (the visible answer) # delta.reasoning total len: 159 chars (the chain-of-thought) <- driver dropped this ``` A representative event showing both fields side by side: ```json data: {"choices":[{"index":0,"delta":{"reasoning":"15% = 0.15."}}]} data: {"choices":[{"index":0,"delta":{"content":"15% of 80 is "}}]} ``` The 159 chars of reasoning were arriving on the wire and being thrown away. `solar-pro2` was also probed (625 events); it does **not** emit `delta.reasoning` — its reasoning is inlined into `delta.content` — so this change is a no-op for it and for `solar-mini`. ### What this PR includes - `internal/entity/models/upstage.go`: in the SSE scanner loop, extract `delta.reasoning` before `delta.content` and forward each non-empty chunk via the sender's second arg (the existing `reasonContent` channel the non-stream path already populates). The ordering contract is documented inline: reasoning chunks within a single SSE event are emitted before content chunks, so a UI that pipes both sees the chain-of-thought start before the answer for that token, matching the wire order Upstage emits. - `internal/entity/models/upstage_test.go`: three new tests pinning the new behavior: - `TestUpstageStreamExtractsReasoningDelta` — reasoning + content forwarded to the right sender args; one-of invariant per call - `TestUpstageStreamReasoningChunksArriveBeforeContent` — ordering pinned within a single SSE event that carries both fields - `TestUpstageStreamWithoutReasoningStillWorks` — regression net: non-reasoning models (`solar-mini`, `solar-pro2`) continue to work; the reason callback never fires No interface change. No factory change. No config change. ### How was this tested? ``` $ go test -vet=off -run TestUpstage -count=1 -v ./internal/entity/models/... ... (existing tests 1..9 still pass) ... === RUN TestUpstageStreamExtractsReasoningDelta --- PASS: TestUpstageStreamExtractsReasoningDelta (0.01s) === RUN TestUpstageStreamReasoningChunksArriveBeforeContent --- PASS: TestUpstageStreamReasoningChunksArriveBeforeContent (0.01s) === RUN TestUpstageStreamWithoutReasoningStillWorks --- PASS: TestUpstageStreamWithoutReasoningStillWorks (0.00s) PASS ok ragflow/internal/entity/models 0.034s ``` 12/12 Upstage tests pass on go 1.25. `go build ./internal/entity/models/...` exits 0. **Live integration test** (smoke test not committed) — the patched driver was run directly against `api.upstage.ai/v1` with the same prompt that produced the curl evidence above: ``` === RUN TestUpstageStreamReasoningLiveSmoke [OK] visible content: 50 chunks, 84 chars [OK] reasoning: 39 chunks, 90 chars content head 200: "\\(15\\% = \\frac{15}{100}=0.15\\).\n\n\\[\n0.15 \\times 80 = 12.\n\\]\n\n**15 % of 80 is 12.**" reasoning head 200: "We need to compute 15% of 80. That's 0.15 * 80 = 12. So answer is 12. Provide explanation." UPSTAGE STREAM REASONING SMOKE PASSED --- PASS: TestUpstageStreamReasoningLiveSmoke (1.97s) ``` Before this fix, the same call would have produced **0 reasoning chunks**. The 90 chars of reasoning that the patched driver now surfaces are the chain-of-thought solar-pro3 emits when reasoning_effort is high. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@@ -356,6 +356,19 @@ func (u *UpstageModel) ChatStreamlyWithSender(modelName string, messages []Messa
|
||||
continue
|
||||
}
|
||||
|
||||
// Reasoning chunks first, content second. Upstage's solar-pro3
|
||||
// stream interleaves both fields within the same SSE event when
|
||||
// reasoning_effort is medium or high; emit reasoning before the
|
||||
// visible answer so callers that pipe both into a UI see the
|
||||
// chain-of-thought start before the answer, matching the wire
|
||||
// ordering. solar-pro2 inlines reasoning into delta.content and
|
||||
// never sets delta.reasoning, so this block is a no-op for it.
|
||||
if r, ok := delta["reasoning"].(string); ok && r != "" {
|
||||
if err := sender(nil, &r); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
content, ok := delta["content"].(string)
|
||||
if ok && content != "" {
|
||||
if err := sender(&content, nil); err != nil {
|
||||
|
||||
@@ -269,3 +269,155 @@ func TestUpstageEmbedHappyPathReordersByIndex(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- streaming reasoning delta extraction ----------
|
||||
|
||||
// TestUpstageStreamExtractsReasoningDelta verifies that the SSE parser
|
||||
// forwards `delta.reasoning` chunks via the sender's second arg (the
|
||||
// reasonContent channel) and `delta.content` chunks via the first arg.
|
||||
// Fixture matches the shape captured live from solar-pro3 with
|
||||
// reasoning_effort=high — both fields appear, sometimes in the same
|
||||
// chunk and sometimes separately.
|
||||
func TestUpstageStreamExtractsReasoningDelta(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = io.WriteString(w,
|
||||
`data: {"choices":[{"index":0,"delta":{"role":"assistant"}}]}`+"\n"+
|
||||
`data: {"choices":[{"index":0,"delta":{"reasoning":"We need "}}]}`+"\n"+
|
||||
`data: {"choices":[{"index":0,"delta":{"reasoning":"to compute. "}}]}`+"\n"+
|
||||
`data: {"choices":[{"index":0,"delta":{"reasoning":"15% = 0.15."}}]}`+"\n"+
|
||||
`data: {"choices":[{"index":0,"delta":{"content":"15% of 80 "}}]}`+"\n"+
|
||||
`data: {"choices":[{"index":0,"delta":{"content":"is 12."},"finish_reason":"stop"}]}`+"\n"+
|
||||
`data: [DONE]`+"\n",
|
||||
)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
u := newUpstageForTest(srv.URL)
|
||||
apiKey := "test-key"
|
||||
var contentChunks, reasoningChunks []string
|
||||
err := u.ChatStreamlyWithSender("solar-pro3",
|
||||
[]Message{{Role: "user", Content: "What is 15% of 80?"}},
|
||||
&APIConfig{ApiKey: &apiKey}, nil,
|
||||
func(content *string, reason *string) error {
|
||||
// At most one of (content, reason) is set per call: callers
|
||||
// need this contract to route to the right UI channel.
|
||||
if content != nil && reason != nil {
|
||||
t.Errorf("sender called with both content and reason non-nil")
|
||||
}
|
||||
if content != nil && *content != "" && *content != "[DONE]" {
|
||||
contentChunks = append(contentChunks, *content)
|
||||
}
|
||||
if reason != nil && *reason != "" {
|
||||
reasoningChunks = append(reasoningChunks, *reason)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("stream: %v", err)
|
||||
}
|
||||
wantReason := "We need to compute. 15% = 0.15."
|
||||
wantContent := "15% of 80 is 12."
|
||||
if got := strings.Join(reasoningChunks, ""); got != wantReason {
|
||||
t.Errorf("joined reasoning=%q want %q", got, wantReason)
|
||||
}
|
||||
if got := strings.Join(contentChunks, ""); got != wantContent {
|
||||
t.Errorf("joined content=%q want %q", got, wantContent)
|
||||
}
|
||||
if len(reasoningChunks) != 3 {
|
||||
t.Errorf("expected 3 reasoning chunks, got %d", len(reasoningChunks))
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpstageStreamReasoningChunksArriveBeforeContent verifies the
|
||||
// ordering contract inside a single SSE event when both fields are
|
||||
// present: reasoning is forwarded first so a UI consuming both can
|
||||
// render the chain-of-thought before the answer for that token.
|
||||
func TestUpstageStreamReasoningChunksArriveBeforeContent(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = io.WriteString(w,
|
||||
// One SSE event carries BOTH reasoning and content in the
|
||||
// same delta. The driver must forward reasoning first.
|
||||
`data: {"choices":[{"index":0,"delta":{"reasoning":"R1","content":"C1"}}]}`+"\n"+
|
||||
`data: {"choices":[{"index":0,"delta":{"content":"C2"},"finish_reason":"stop"}]}`+"\n"+
|
||||
`data: [DONE]`+"\n",
|
||||
)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
u := newUpstageForTest(srv.URL)
|
||||
apiKey := "test-key"
|
||||
var seq []string
|
||||
err := u.ChatStreamlyWithSender("solar-pro3",
|
||||
[]Message{{Role: "user", Content: "x"}},
|
||||
&APIConfig{ApiKey: &apiKey}, nil,
|
||||
func(content *string, reason *string) error {
|
||||
if reason != nil && *reason != "" {
|
||||
seq = append(seq, "R:"+*reason)
|
||||
}
|
||||
if content != nil && *content != "" && *content != "[DONE]" {
|
||||
seq = append(seq, "C:"+*content)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("stream: %v", err)
|
||||
}
|
||||
wantSeq := []string{"R:R1", "C:C1", "C:C2"}
|
||||
if len(seq) != len(wantSeq) {
|
||||
t.Fatalf("seq=%v want %v", seq, wantSeq)
|
||||
}
|
||||
for i, want := range wantSeq {
|
||||
if seq[i] != want {
|
||||
t.Errorf("seq[%d]=%q want %q (full=%v)", i, seq[i], want, seq)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpstageStreamWithoutReasoningStillWorks is the regression net:
|
||||
// non-reasoning models (solar-mini, solar-pro2 with no reasoning_effort)
|
||||
// emit only delta.content. The driver must not regress on them.
|
||||
func TestUpstageStreamWithoutReasoningStillWorks(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = io.WriteString(w,
|
||||
`data: {"choices":[{"index":0,"delta":{"content":"Hello "}}]}`+"\n"+
|
||||
`data: {"choices":[{"index":0,"delta":{"content":"world"},"finish_reason":"stop"}]}`+"\n"+
|
||||
`data: [DONE]`+"\n",
|
||||
)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
u := newUpstageForTest(srv.URL)
|
||||
apiKey := "test-key"
|
||||
var content []string
|
||||
var reasonCalled bool
|
||||
err := u.ChatStreamlyWithSender("solar-mini",
|
||||
[]Message{{Role: "user", Content: "x"}},
|
||||
&APIConfig{ApiKey: &apiKey}, nil,
|
||||
func(c *string, r *string) error {
|
||||
if r != nil && *r != "" {
|
||||
reasonCalled = true
|
||||
}
|
||||
if c != nil && *c != "" && *c != "[DONE]" {
|
||||
content = append(content, *c)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("stream: %v", err)
|
||||
}
|
||||
if reasonCalled {
|
||||
t.Errorf("reasoning callback fired despite no delta.reasoning in fixture")
|
||||
}
|
||||
if got := strings.Join(content, ""); got != "Hello world" {
|
||||
t.Errorf("content=%q want %q", got, "Hello world")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user