diff --git a/internal/entity/models/upstage.go b/internal/entity/models/upstage.go index 1b492b60e2..31379623f0 100644 --- a/internal/entity/models/upstage.go +++ b/internal/entity/models/upstage.go @@ -356,6 +356,19 @@ func (u *UpstageModel) ChatStreamlyWithSender(modelName string, messages []Messa continue } + // Reasoning chunks first, content second. Upstage's solar-pro3 + // stream interleaves both fields within the same SSE event when + // reasoning_effort is medium or high; emit reasoning before the + // visible answer so callers that pipe both into a UI see the + // chain-of-thought start before the answer, matching the wire + // ordering. solar-pro2 inlines reasoning into delta.content and + // never sets delta.reasoning, so this block is a no-op for it. + if r, ok := delta["reasoning"].(string); ok && r != "" { + if err := sender(nil, &r); err != nil { + return err + } + } + content, ok := delta["content"].(string) if ok && content != "" { if err := sender(&content, nil); err != nil { diff --git a/internal/entity/models/upstage_test.go b/internal/entity/models/upstage_test.go index cb651df94a..f9a8bba36a 100644 --- a/internal/entity/models/upstage_test.go +++ b/internal/entity/models/upstage_test.go @@ -269,3 +269,155 @@ func TestUpstageEmbedHappyPathReordersByIndex(t *testing.T) { } } } + +// ---------- streaming reasoning delta extraction ---------- + +// TestUpstageStreamExtractsReasoningDelta verifies that the SSE parser +// forwards `delta.reasoning` chunks via the sender's second arg (the +// reasonContent channel) and `delta.content` chunks via the first arg. +// Fixture matches the shape captured live from solar-pro3 with +// reasoning_effort=high — both fields appear, sometimes in the same +// chunk and sometimes separately. +func TestUpstageStreamExtractsReasoningDelta(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, + `data: {"choices":[{"index":0,"delta":{"role":"assistant"}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"reasoning":"We need "}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"reasoning":"to compute. "}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"reasoning":"15% = 0.15."}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"content":"15% of 80 "}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"content":"is 12."},"finish_reason":"stop"}]}`+"\n"+ + `data: [DONE]`+"\n", + ) + })) + defer srv.Close() + + u := newUpstageForTest(srv.URL) + apiKey := "test-key" + var contentChunks, reasoningChunks []string + err := u.ChatStreamlyWithSender("solar-pro3", + []Message{{Role: "user", Content: "What is 15% of 80?"}}, + &APIConfig{ApiKey: &apiKey}, nil, + func(content *string, reason *string) error { + // At most one of (content, reason) is set per call: callers + // need this contract to route to the right UI channel. + if content != nil && reason != nil { + t.Errorf("sender called with both content and reason non-nil") + } + if content != nil && *content != "" && *content != "[DONE]" { + contentChunks = append(contentChunks, *content) + } + if reason != nil && *reason != "" { + reasoningChunks = append(reasoningChunks, *reason) + } + return nil + }, + ) + if err != nil { + t.Fatalf("stream: %v", err) + } + wantReason := "We need to compute. 15% = 0.15." + wantContent := "15% of 80 is 12." + if got := strings.Join(reasoningChunks, ""); got != wantReason { + t.Errorf("joined reasoning=%q want %q", got, wantReason) + } + if got := strings.Join(contentChunks, ""); got != wantContent { + t.Errorf("joined content=%q want %q", got, wantContent) + } + if len(reasoningChunks) != 3 { + t.Errorf("expected 3 reasoning chunks, got %d", len(reasoningChunks)) + } +} + +// TestUpstageStreamReasoningChunksArriveBeforeContent verifies the +// ordering contract inside a single SSE event when both fields are +// present: reasoning is forwarded first so a UI consuming both can +// render the chain-of-thought before the answer for that token. +func TestUpstageStreamReasoningChunksArriveBeforeContent(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, + // One SSE event carries BOTH reasoning and content in the + // same delta. The driver must forward reasoning first. + `data: {"choices":[{"index":0,"delta":{"reasoning":"R1","content":"C1"}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"content":"C2"},"finish_reason":"stop"}]}`+"\n"+ + `data: [DONE]`+"\n", + ) + })) + defer srv.Close() + + u := newUpstageForTest(srv.URL) + apiKey := "test-key" + var seq []string + err := u.ChatStreamlyWithSender("solar-pro3", + []Message{{Role: "user", Content: "x"}}, + &APIConfig{ApiKey: &apiKey}, nil, + func(content *string, reason *string) error { + if reason != nil && *reason != "" { + seq = append(seq, "R:"+*reason) + } + if content != nil && *content != "" && *content != "[DONE]" { + seq = append(seq, "C:"+*content) + } + return nil + }, + ) + if err != nil { + t.Fatalf("stream: %v", err) + } + wantSeq := []string{"R:R1", "C:C1", "C:C2"} + if len(seq) != len(wantSeq) { + t.Fatalf("seq=%v want %v", seq, wantSeq) + } + for i, want := range wantSeq { + if seq[i] != want { + t.Errorf("seq[%d]=%q want %q (full=%v)", i, seq[i], want, seq) + } + } +} + +// TestUpstageStreamWithoutReasoningStillWorks is the regression net: +// non-reasoning models (solar-mini, solar-pro2 with no reasoning_effort) +// emit only delta.content. The driver must not regress on them. +func TestUpstageStreamWithoutReasoningStillWorks(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, + `data: {"choices":[{"index":0,"delta":{"content":"Hello "}}]}`+"\n"+ + `data: {"choices":[{"index":0,"delta":{"content":"world"},"finish_reason":"stop"}]}`+"\n"+ + `data: [DONE]`+"\n", + ) + })) + defer srv.Close() + + u := newUpstageForTest(srv.URL) + apiKey := "test-key" + var content []string + var reasonCalled bool + err := u.ChatStreamlyWithSender("solar-mini", + []Message{{Role: "user", Content: "x"}}, + &APIConfig{ApiKey: &apiKey}, nil, + func(c *string, r *string) error { + if r != nil && *r != "" { + reasonCalled = true + } + if c != nil && *c != "" && *c != "[DONE]" { + content = append(content, *c) + } + return nil + }, + ) + if err != nil { + t.Fatalf("stream: %v", err) + } + if reasonCalled { + t.Errorf("reasoning callback fired despite no delta.reasoning in fixture") + } + if got := strings.Join(content, ""); got != "Hello world" { + t.Errorf("content=%q want %q", got, "Hello world") + } +}