From 38a8bc3dab456829e41563aa5ffd1574f46167a7 Mon Sep 17 00:00:00 2001 From: tmimmanuel <14046872+tmimmanuel@users.noreply.github.com> Date: Wed, 20 May 2026 21:20:37 -1000 Subject: [PATCH] fix(upstage): extract reasoning delta from streaming responses (#14817) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? `UpstageModel.ChatStreamlyWithSender` (in the driver merged via #14819) only extracted `delta.content` from each SSE event. For the `solar-pro3` reasoning family (and any future Upstage model that follows the same wire shape), the chain-of-thought is streamed in a **separate `delta.reasoning` field**, and the driver was silently dropping all of it. The non-streaming path already extracts `message.reasoning` into `ChatResponse.ReasonContent` (added earlier in this PR's history), so the same model produced **inconsistent behavior** between streaming and non-streaming: a tenant calling `solar-pro3` with `reasoning_effort: high` would see the reasoning trace if they used `ChatWithMessages` but not if they used `ChatStreamlyWithSender`. ### Live evidence Probed against `api.upstage.ai/v1/chat/completions` with `solar-pro3` + `reasoning_effort: high` + `stream: true` (8000-token budget so the reasoning has room to finish): ``` $ curl -sN -H "Authorization: Bearer " -H "Content-Type: application/json" \ -X POST https://api.upstage.ai/v1/chat/completions \ -d '{"model":"solar-pro3","messages":[{"role":"user","content":"Compute 15% of 80."}], "max_tokens":8000,"stream":true,"reasoning_effort":"high"}' # across 168 SSE events: # delta keys seen: [content reasoning role] # delta.content total len: 121 chars (the visible answer) # delta.reasoning total len: 159 chars (the chain-of-thought) <- driver dropped this ``` A representative event showing both fields side by side: ```json data: {"choices":[{"index":0,"delta":{"reasoning":"15% = 0.15."}}]} data: {"choices":[{"index":0,"delta":{"content":"15% of 80 is "}}]} ``` The 159 chars of reasoning were arriving on the wire and being thrown away. `solar-pro2` was also probed (625 events); it does **not** emit `delta.reasoning` — its reasoning is inlined into `delta.content` — so this change is a no-op for it and for `solar-mini`. ### What this PR includes - `internal/entity/models/upstage.go`: in the SSE scanner loop, extract `delta.reasoning` before `delta.content` and forward each non-empty chunk via the sender's second arg (the existing `reasonContent` channel the non-stream path already populates). The ordering contract is documented inline: reasoning chunks within a single SSE event are emitted before content chunks, so a UI that pipes both sees the chain-of-thought start before the answer for that token, matching the wire order Upstage emits. - `internal/entity/models/upstage_test.go`: three new tests pinning the new behavior: - `TestUpstageStreamExtractsReasoningDelta` — reasoning + content forwarded to the right sender args; one-of invariant per call - `TestUpstageStreamReasoningChunksArriveBeforeContent` — ordering pinned within a single SSE event that carries both fields - `TestUpstageStreamWithoutReasoningStillWorks` — regression net: non-reasoning models (`solar-mini`, `solar-pro2`) continue to work; the reason callback never fires No interface change. No factory change. No config change. ### How was this tested? ``` $ go test -vet=off -run TestUpstage -count=1 -v ./internal/entity/models/... ... (existing tests 1..9 still pass) ... === RUN TestUpstageStreamExtractsReasoningDelta --- PASS: TestUpstageStreamExtractsReasoningDelta (0.01s) === RUN TestUpstageStreamReasoningChunksArriveBeforeContent --- PASS: TestUpstageStreamReasoningChunksArriveBeforeContent (0.01s) === RUN TestUpstageStreamWithoutReasoningStillWorks --- PASS: TestUpstageStreamWithoutReasoningStillWorks (0.00s) PASS ok ragflow/internal/entity/models 0.034s ``` 12/12 Upstage tests pass on go 1.25. `go build ./internal/entity/models/...` exits 0. **Live integration test** (smoke test not committed) — the patched driver was run directly against `api.upstage.ai/v1` with the same prompt that produced the curl evidence above: ``` === RUN TestUpstageStreamReasoningLiveSmoke [OK] visible content: 50 chunks, 84 chars [OK] reasoning: 39 chunks, 90 chars content head 200: "\\(15\\% = \\frac{15}{100}=0.15\\).\n\n\\[\n0.15 \\times 80 = 12.\n\\]\n\n**15 % of 80 is 12.**" reasoning head 200: "We need to compute 15% of 80. That's 0.15 * 80 = 12. So answer is 12. Provide explanation." UPSTAGE STREAM REASONING SMOKE PASSED --- PASS: TestUpstageStreamReasoningLiveSmoke (1.97s) ``` Before this fix, the same call would have produced **0 reasoning chunks**. The 90 chars of reasoning that the patched driver now surfaces are the chain-of-thought solar-pro3 emits when reasoning_effort is high. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- internal/entity/models/upstage.go | 13 +++ internal/entity/models/upstage_test.go | 152 +++++++++++++++++++++++++ 2 files changed, 165 insertions(+) diff --git a/internal/entity/models/upstage.go b/internal/entity/models/upstage.go index 1b492b60e2..31379623f0 100644 --- a/internal/entity/models/upstage.go +++ b/internal/entity/models/upstage.go @@ -356,6 +356,19 @@ func (u *UpstageModel) ChatStreamlyWithSender(modelName string, messages []Messa continue } + // Reasoning chunks first, content second. Upstage's solar-pro3 + // stream interleaves both fields within the same SSE event when + // reasoning_effort is medium or high; emit reasoning before the + // visible answer so callers that pipe both into a UI see the + // chain-of-thought start before the answer, matching the wire + // ordering. solar-pro2 inlines reasoning into delta.content and + // never sets delta.reasoning, so this block is a no-op for it. + if r, ok := delta["reasoning"].(string); ok && r != "" { + if err := sender(nil, &r); err != nil { + return err + } + } + content, ok := delta["content"].(string) if ok && content != "" { if err := sender(&content, nil); err != nil { diff --git a/internal/entity/models/upstage_test.go b/internal/entity/models/upstage_test.go index cb651df94a..f9a8bba36a 100644 --- a/internal/entity/models/upstage_test.go +++ b/internal/entity/models/upstage_test.go @@ -269,3 +269,155 @@ func TestUpstageEmbedHappyPathReordersByIndex(t *testing.T) { } } } + +// ---------- streaming reasoning delta extraction ---------- + +// TestUpstageStreamExtractsReasoningDelta verifies that the SSE parser +// forwards `delta.reasoning` chunks via the sender's second arg (the +// reasonContent channel) and `delta.content` chunks via the first arg. +// Fixture matches the shape captured live from solar-pro3 with +// reasoning_effort=high — both fields appear, sometimes in the same +// chunk and sometimes separately. +func TestUpstageStreamExtractsReasoningDelta(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, + `data: {"choices":[{"index":0,"delta":{"role":"assistant"}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"reasoning":"We need "}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"reasoning":"to compute. "}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"reasoning":"15% = 0.15."}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"content":"15% of 80 "}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"content":"is 12."},"finish_reason":"stop"}]}`+"\n"+ + `data: [DONE]`+"\n", + ) + })) + defer srv.Close() + + u := newUpstageForTest(srv.URL) + apiKey := "test-key" + var contentChunks, reasoningChunks []string + err := u.ChatStreamlyWithSender("solar-pro3", + []Message{{Role: "user", Content: "What is 15% of 80?"}}, + &APIConfig{ApiKey: &apiKey}, nil, + func(content *string, reason *string) error { + // At most one of (content, reason) is set per call: callers + // need this contract to route to the right UI channel. + if content != nil && reason != nil { + t.Errorf("sender called with both content and reason non-nil") + } + if content != nil && *content != "" && *content != "[DONE]" { + contentChunks = append(contentChunks, *content) + } + if reason != nil && *reason != "" { + reasoningChunks = append(reasoningChunks, *reason) + } + return nil + }, + ) + if err != nil { + t.Fatalf("stream: %v", err) + } + wantReason := "We need to compute. 15% = 0.15." + wantContent := "15% of 80 is 12." + if got := strings.Join(reasoningChunks, ""); got != wantReason { + t.Errorf("joined reasoning=%q want %q", got, wantReason) + } + if got := strings.Join(contentChunks, ""); got != wantContent { + t.Errorf("joined content=%q want %q", got, wantContent) + } + if len(reasoningChunks) != 3 { + t.Errorf("expected 3 reasoning chunks, got %d", len(reasoningChunks)) + } +} + +// TestUpstageStreamReasoningChunksArriveBeforeContent verifies the +// ordering contract inside a single SSE event when both fields are +// present: reasoning is forwarded first so a UI consuming both can +// render the chain-of-thought before the answer for that token. +func TestUpstageStreamReasoningChunksArriveBeforeContent(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, + // One SSE event carries BOTH reasoning and content in the + // same delta. The driver must forward reasoning first. + `data: {"choices":[{"index":0,"delta":{"reasoning":"R1","content":"C1"}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"content":"C2"},"finish_reason":"stop"}]}`+"\n"+ + `data: [DONE]`+"\n", + ) + })) + defer srv.Close() + + u := newUpstageForTest(srv.URL) + apiKey := "test-key" + var seq []string + err := u.ChatStreamlyWithSender("solar-pro3", + []Message{{Role: "user", Content: "x"}}, + &APIConfig{ApiKey: &apiKey}, nil, + func(content *string, reason *string) error { + if reason != nil && *reason != "" { + seq = append(seq, "R:"+*reason) + } + if content != nil && *content != "" && *content != "[DONE]" { + seq = append(seq, "C:"+*content) + } + return nil + }, + ) + if err != nil { + t.Fatalf("stream: %v", err) + } + wantSeq := []string{"R:R1", "C:C1", "C:C2"} + if len(seq) != len(wantSeq) { + t.Fatalf("seq=%v want %v", seq, wantSeq) + } + for i, want := range wantSeq { + if seq[i] != want { + t.Errorf("seq[%d]=%q want %q (full=%v)", i, seq[i], want, seq) + } + } +} + +// TestUpstageStreamWithoutReasoningStillWorks is the regression net: +// non-reasoning models (solar-mini, solar-pro2 with no reasoning_effort) +// emit only delta.content. The driver must not regress on them. +func TestUpstageStreamWithoutReasoningStillWorks(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, + `data: {"choices":[{"index":0,"delta":{"content":"Hello "}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"content":"world"},"finish_reason":"stop"}]}`+"\n"+ + `data: [DONE]`+"\n", + ) + })) + defer srv.Close() + + u := newUpstageForTest(srv.URL) + apiKey := "test-key" + var content []string + var reasonCalled bool + err := u.ChatStreamlyWithSender("solar-mini", + []Message{{Role: "user", Content: "x"}}, + &APIConfig{ApiKey: &apiKey}, nil, + func(c *string, r *string) error { + if r != nil && *r != "" { + reasonCalled = true + } + if c != nil && *c != "" && *c != "[DONE]" { + content = append(content, *c) + } + return nil + }, + ) + if err != nil { + t.Fatalf("stream: %v", err) + } + if reasonCalled { + t.Errorf("reasoning callback fired despite no delta.reasoning in fixture") + } + if got := strings.Join(content, ""); got != "Hello world" { + t.Errorf("content=%q want %q", got, "Hello world") + } +}