diff --git a/conf/models/modelscope.json b/conf/models/modelscope.json new file mode 100644 index 0000000000..02a9ebe0f8 --- /dev/null +++ b/conf/models/modelscope.json @@ -0,0 +1,8 @@ +{ + "name": "modelscope", + "url_suffix": { + "chat": "v1/chat/completions", + "models": "v1/models" + }, + "class": "local" +} diff --git a/internal/entity/models/factory.go b/internal/entity/models/factory.go index 45ad2ee29b..6e263ab90f 100644 --- a/internal/entity/models/factory.go +++ b/internal/entity/models/factory.go @@ -97,6 +97,8 @@ func (f *ModelFactory) CreateModelDriver(providerName string, baseURL map[string return NewXinferenceModel(baseURL, urlSuffix), nil case "astraflow": return NewAstraflowModel(baseURL, urlSuffix), nil + case "modelscope": + return NewModelScopeModel(baseURL, urlSuffix), nil case "longcat": return NewLongCatModel(baseURL, urlSuffix), nil case "hunyuan": diff --git a/internal/entity/models/modelscope.go b/internal/entity/models/modelscope.go new file mode 100644 index 0000000000..5dccc390de --- /dev/null +++ b/internal/entity/models/modelscope.go @@ -0,0 +1,477 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package models + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "sync" + "time" +) + +// modelscopeStreamIdleTimeout bounds how long a stream can go without +// receiving any SSE line. Self-hosted ModelScope deployments can be slow, +// but a stream that stays silent for a full minute is more useful as a +// surfaced error than as a stuck goroutine. +var modelscopeStreamIdleTimeout = 60 * time.Second + +// ModelScopeModel implements ModelDriver for ModelScope chat models. +// +// ModelScope exposes an OpenAI-compatible API under /v1. +// The tenant supplies the deployment endpoint (no default — matches the +// Python ModelScopeChat at rag/llm/chat_model.py which raises on a +// missing base URL). Both the root endpoint and the OpenAI-compatible +// endpoint (.../v1) are accepted; the driver normalizes both to the root +// before appending URLSuffix values like v1/chat/completions. +// Authentication is optional: deployments without auth ignore API keys, +// while auth-enabled deployments require Authorization: Bearer . +type ModelScopeModel struct { + BaseURL map[string]string + URLSuffix URLSuffix + httpClient *http.Client +} + +type modelscopeChatChoice struct { + Message struct { + Content string `json:"content"` + ReasoningContent string `json:"reasoning_content"` + Reasoning string `json:"reasoning"` + Thinking string `json:"thinking"` + } `json:"message"` +} + +type modelscopeChatResponse struct { + Choices []modelscopeChatChoice `json:"choices"` +} + +type modelscopeModelListResponse struct { + Data []struct { + ID string `json:"id"` + } `json:"data"` +} + +// NewModelScopeModel creates a new ModelScope model instance. +func NewModelScopeModel(baseURL map[string]string, urlSuffix URLSuffix) *ModelScopeModel { + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConns = 100 + transport.MaxIdleConnsPerHost = 10 + transport.IdleConnTimeout = 90 * time.Second + transport.DisableCompression = false + transport.ResponseHeaderTimeout = 60 * time.Second + + return &ModelScopeModel{ + BaseURL: baseURL, + URLSuffix: urlSuffix, + httpClient: &http.Client{ + Transport: transport, + }, + } +} + +func (m *ModelScopeModel) NewInstance(baseURL map[string]string) ModelDriver { + return NewModelScopeModel(baseURL, m.URLSuffix) +} + +func (m *ModelScopeModel) Name() string { + return "modelscope" +} + +func (m *ModelScopeModel) baseURLForRegion(region string) (string, error) { + if base, ok := m.BaseURL[region]; ok && strings.TrimSpace(base) != "" { + return normalizeModelScopeBaseURL(base), nil + } + if base, ok := m.BaseURL["default"]; ok && strings.TrimSpace(base) != "" { + return normalizeModelScopeBaseURL(base), nil + } + return "", fmt.Errorf("modelscope: missing base URL, configure the ModelScope endpoint (e.g., http://127.0.0.1:8000 or http://127.0.0.1:8000/v1)") +} + +func normalizeModelScopeBaseURL(base string) string { + trimmed := strings.TrimRight(strings.TrimSpace(base), "/") + if trimmed == "" { + return trimmed + } + if strings.HasSuffix(trimmed, "/v1") { + return strings.TrimSuffix(trimmed, "/v1") + } + return trimmed +} + +func setModelScopeAuth(req *http.Request, apiConfig *APIConfig) { + if apiConfig == nil || apiConfig.ApiKey == nil || *apiConfig.ApiKey == "" { + return + } + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", *apiConfig.ApiKey)) +} + +func modelscopeRegion(apiConfig *APIConfig) string { + if apiConfig != nil && apiConfig.Region != nil && *apiConfig.Region != "" { + return *apiConfig.Region + } + return "default" +} + +func modelscopeReasoningFromStrings(reasoningContent string, reasoning string, thinking string) string { + switch { + case reasoningContent != "": + return reasoningContent + case reasoning != "": + return reasoning + case thinking != "": + return thinking + default: + return "" + } +} + +func modelscopeReasoningFromMap(value map[string]interface{}) string { + for _, field := range []string{"reasoning_content", "reasoning", "thinking"} { + if text, ok := value[field].(string); ok && text != "" { + return text + } + } + return "" +} + +func buildModelScopeChatBody(modelName string, messages []Message, stream bool, chatModelConfig *ChatConfig) map[string]interface{} { + apiMessages := make([]map[string]interface{}, len(messages)) + for i, msg := range messages { + apiMessages[i] = map[string]interface{}{ + "role": msg.Role, + "content": msg.Content, + } + } + + reqBody := map[string]interface{}{ + "model": modelName, + "messages": apiMessages, + "stream": stream, + } + + if chatModelConfig != nil { + if chatModelConfig.MaxTokens != nil { + reqBody["max_tokens"] = *chatModelConfig.MaxTokens + } + if chatModelConfig.Temperature != nil { + reqBody["temperature"] = *chatModelConfig.Temperature + } + if chatModelConfig.TopP != nil { + reqBody["top_p"] = *chatModelConfig.TopP + } + if chatModelConfig.Stop != nil { + reqBody["stop"] = *chatModelConfig.Stop + } + } + + return reqBody +} + +// ChatWithMessages sends multiple messages with roles and returns the response. +func (m *ModelScopeModel) ChatWithMessages(modelName string, messages []Message, apiConfig *APIConfig, chatModelConfig *ChatConfig) (*ChatResponse, error) { + if len(messages) == 0 { + return nil, fmt.Errorf("messages is empty") + } + + baseURL, err := m.baseURLForRegion(modelscopeRegion(apiConfig)) + if err != nil { + return nil, err + } + url := fmt.Sprintf("%s/%s", baseURL, m.URLSuffix.Chat) + + reqBody := buildModelScopeChatBody(modelName, messages, false, chatModelConfig) + jsonData, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("failed to marshal request: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), nonStreamCallTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + setModelScopeAuth(req, apiConfig) + + resp, err := m.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result modelscopeChatResponse + if err = json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("failed to parse response: %w", err) + } + if len(result.Choices) == 0 { + return nil, fmt.Errorf("no choices in response") + } + + content := result.Choices[0].Message.Content + reasonContent := modelscopeReasoningFromStrings( + result.Choices[0].Message.ReasoningContent, + result.Choices[0].Message.Reasoning, + result.Choices[0].Message.Thinking, + ) + + return &ChatResponse{ + Answer: &content, + ReasonContent: &reasonContent, + }, nil +} + +// ChatStreamlyWithSender sends messages and streams response via sender. +func (m *ModelScopeModel) ChatStreamlyWithSender(modelName string, messages []Message, apiConfig *APIConfig, chatModelConfig *ChatConfig, sender func(*string, *string) error) error { + if sender == nil { + return fmt.Errorf("sender is required") + } + if len(messages) == 0 { + return fmt.Errorf("messages is empty") + } + if chatModelConfig != nil && chatModelConfig.Stream != nil && !*chatModelConfig.Stream { + return fmt.Errorf("stream must be true in ChatStreamlyWithSender") + } + + baseURL, err := m.baseURLForRegion(modelscopeRegion(apiConfig)) + if err != nil { + return err + } + url := fmt.Sprintf("%s/%s", baseURL, m.URLSuffix.Chat) + + reqBody := buildModelScopeChatBody(modelName, messages, true, chatModelConfig) + jsonData, err := json.Marshal(reqBody) + if err != nil { + return fmt.Errorf("failed to marshal request: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + setModelScopeAuth(req, apiConfig) + + resp, err := m.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + lastActive := time.Now() + var lastActiveMu sync.Mutex + done := make(chan struct{}) + defer close(done) + go func() { + ticker := time.NewTicker(modelscopeStreamIdleTimeout / 4) + defer ticker.Stop() + for { + select { + case <-done: + return + case now := <-ticker.C: + lastActiveMu.Lock() + idle := now.Sub(lastActive) + lastActiveMu.Unlock() + if idle >= modelscopeStreamIdleTimeout { + cancel() + return + } + } + } + }() + + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(make([]byte, 64*1024), 1024*1024) + sawTerminal := false + for scanner.Scan() { + lastActiveMu.Lock() + lastActive = time.Now() + lastActiveMu.Unlock() + + line := scanner.Text() + if !strings.HasPrefix(line, "data:") { + continue + } + data := strings.TrimSpace(line[5:]) + if data == "[DONE]" { + sawTerminal = true + break + } + + var event map[string]interface{} + if err = json.Unmarshal([]byte(data), &event); err != nil { + continue + } + + choices, ok := event["choices"].([]interface{}) + if !ok || len(choices) == 0 { + continue + } + firstChoice, ok := choices[0].(map[string]interface{}) + if !ok { + continue + } + + if delta, ok := firstChoice["delta"].(map[string]interface{}); ok { + if reasoning := modelscopeReasoningFromMap(delta); reasoning != "" { + if err := sender(nil, &reasoning); err != nil { + return err + } + } + if content, ok := delta["content"].(string); ok && content != "" { + if err := sender(&content, nil); err != nil { + return err + } + } + } + + if finishReason, ok := firstChoice["finish_reason"].(string); ok && finishReason != "" { + sawTerminal = true + break + } + } + + if err := scanner.Err(); err != nil { + if ctx.Err() != nil { + return fmt.Errorf("modelscope: stream idle for more than %s, aborted", modelscopeStreamIdleTimeout) + } + return fmt.Errorf("failed to scan response body: %w", err) + } + if !sawTerminal { + return fmt.Errorf("modelscope: stream ended before [DONE] or finish_reason") + } + + endOfStream := "[DONE]" + return sender(&endOfStream, nil) +} + +func (m *ModelScopeModel) Embed(modelName *string, texts []string, apiConfig *APIConfig, embeddingConfig *EmbeddingConfig) ([]EmbeddingData, error) { + return nil, fmt.Errorf("%s, no such method", m.Name()) +} + +func (m *ModelScopeModel) Rerank(modelName *string, query string, documents []string, apiConfig *APIConfig, rerankConfig *RerankConfig) (*RerankResponse, error) { + return nil, fmt.Errorf("%s, no such method", m.Name()) +} + +func (m *ModelScopeModel) TranscribeAudio(modelName *string, file *string, apiConfig *APIConfig, asrConfig *ASRConfig) (*ASRResponse, error) { + return nil, fmt.Errorf("%s, no such method", m.Name()) +} + +func (m *ModelScopeModel) TranscribeAudioWithSender(modelName *string, file *string, apiConfig *APIConfig, asrConfig *ASRConfig, sender func(*string, *string) error) error { + return fmt.Errorf("%s, no such method", m.Name()) +} + +func (m *ModelScopeModel) AudioSpeech(modelName *string, audioContent *string, apiConfig *APIConfig, asrConfig *TTSConfig) (*TTSResponse, error) { + return nil, fmt.Errorf("%s, no such method", m.Name()) +} + +func (m *ModelScopeModel) AudioSpeechWithSender(modelName *string, audioContent *string, apiConfig *APIConfig, ttsConfig *TTSConfig, sender func(*string, *string) error) error { + return fmt.Errorf("%s, no such method", m.Name()) +} + +func (m *ModelScopeModel) OCRFile(modelName *string, content []byte, url *string, apiConfig *APIConfig, ocrConfig *OCRConfig) (*OCRFileResponse, error) { + return nil, fmt.Errorf("%s, no such method", m.Name()) +} + +func (m *ModelScopeModel) ParseFile(modelName *string, content []byte, url *string, apiConfig *APIConfig, parseFileConfig *ParseFileConfig) (*ParseFileResponse, error) { + return nil, fmt.Errorf("%s, no such method", m.Name()) +} + +// ListModels returns the model IDs exposed by ModelScope's OpenAI-compatible +// /v1/models endpoint. +func (m *ModelScopeModel) ListModels(apiConfig *APIConfig) ([]string, error) { + baseURL, err := m.baseURLForRegion(modelscopeRegion(apiConfig)) + if err != nil { + return nil, err + } + url := fmt.Sprintf("%s/%s", baseURL, m.URLSuffix.Models) + + ctx, cancel := context.WithTimeout(context.Background(), nonStreamCallTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + setModelScopeAuth(req, apiConfig) + + resp, err := m.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result modelscopeModelListResponse + if err = json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("failed to parse response: %w", err) + } + + models := make([]string, 0, len(result.Data)) + for _, model := range result.Data { + if model.ID != "" { + models = append(models, model.ID) + } + } + return models, nil +} + +func (m *ModelScopeModel) Balance(apiConfig *APIConfig) (map[string]interface{}, error) { + return nil, fmt.Errorf("%s, no such method", m.Name()) +} + +func (m *ModelScopeModel) CheckConnection(apiConfig *APIConfig) error { + _, err := m.ListModels(apiConfig) + return err +} + +func (m *ModelScopeModel) ListTasks(apiConfig *APIConfig) ([]ListTaskStatus, error) { + return nil, fmt.Errorf("%s, no such method", m.Name()) +} + +func (m *ModelScopeModel) ShowTask(taskID string, apiConfig *APIConfig) (*TaskResponse, error) { + return nil, fmt.Errorf("%s, no such method", m.Name()) +} diff --git a/internal/entity/models/modelscope_test.go b/internal/entity/models/modelscope_test.go new file mode 100644 index 0000000000..5a698f569a --- /dev/null +++ b/internal/entity/models/modelscope_test.go @@ -0,0 +1,343 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package models + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +func newModelScopeForTest(baseURL string) *ModelScopeModel { + return NewModelScopeModel( + map[string]string{"default": baseURL}, + URLSuffix{ + Chat: "v1/chat/completions", + Models: "v1/models", + }, + ) +} + +func withModelScopeIdleTimeout(t *testing.T, d time.Duration) { + t.Helper() + original := modelscopeStreamIdleTimeout + modelscopeStreamIdleTimeout = d + t.Cleanup(func() { + modelscopeStreamIdleTimeout = original + }) +} + +func TestModelScopeName(t *testing.T) { + m := newModelScopeForTest("http://unused") + if got := m.Name(); got != "modelscope" { + t.Errorf("Name()=%q, want %q", got, "modelscope") + } +} + +func TestNormalizeModelScopeBaseURL(t *testing.T) { + cases := []struct { + in string + want string + }{ + {"http://127.0.0.1:8000", "http://127.0.0.1:8000"}, + {"http://127.0.0.1:8000/", "http://127.0.0.1:8000"}, + {"http://127.0.0.1:8000/v1", "http://127.0.0.1:8000"}, + {" http://127.0.0.1:8000/v1/ ", "http://127.0.0.1:8000"}, + } + for _, tc := range cases { + if got := normalizeModelScopeBaseURL(tc.in); got != tc.want { + t.Errorf("normalizeModelScopeBaseURL(%q)=%q, want %q", tc.in, got, tc.want) + } + } +} + +func TestModelScopeFactoryRoute(t *testing.T) { + driver, err := NewModelFactory().CreateModelDriver("modelscope", map[string]string{"default": "http://unused"}, URLSuffix{}) + if err != nil { + t.Fatalf("CreateModelDriver: %v", err) + } + if driver.Name() != "modelscope" { + t.Errorf("driver.Name()=%q, want modelscope", driver.Name()) + } +} + +func TestModelScopeChatHappyPathNormalizesBaseURLAndOmitsEmptyAuth(t *testing.T) { + var seen map[string]interface{} + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/v1/chat/completions" { + t.Errorf("path=%s, want /v1/chat/completions", r.URL.Path) + } + if got := r.Header.Get("Authorization"); got != "" { + t.Errorf("expected no Authorization header, got %q", got) + } + raw, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("read body: %v", err) + http.Error(w, "read error", http.StatusBadRequest) + return + } + if err := json.Unmarshal(raw, &seen); err != nil { + t.Errorf("unmarshal request: %v", err) + http.Error(w, "unmarshal error", http.StatusBadRequest) + return + } + _, _ = io.WriteString(w, `{"choices":[{"message":{"content":"pong"}}]}`) + })) + defer srv.Close() + + m := newModelScopeForTest(srv.URL) + maxTokens := 32 + temp := 0.2 + resp, err := m.ChatWithMessages("Qwen/Qwen2.5-7B-Instruct", + []Message{{Role: "user", Content: "ping"}}, + &APIConfig{}, + &ChatConfig{MaxTokens: &maxTokens, Temperature: &temp}) + if err != nil { + t.Fatalf("ChatWithMessages: %v", err) + } + if resp.Answer == nil || *resp.Answer != "pong" { + t.Fatalf("Answer=%v, want pong", resp.Answer) + } + if seen["model"] != "Qwen/Qwen2.5-7B-Instruct" { + t.Errorf("model=%v, want Qwen/Qwen2.5-7B-Instruct", seen["model"]) + } + if seen["stream"] != false { + t.Errorf("stream=%v, want false", seen["stream"]) + } + if seen["max_tokens"] != float64(32) { + t.Errorf("max_tokens=%v, want 32", seen["max_tokens"]) + } + if seen["temperature"] != 0.2 { + t.Errorf("temperature=%v, want 0.2", seen["temperature"]) + } +} + +func TestModelScopeChatSendsAuthHeaderWhenKeyProvided(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if got := r.Header.Get("Authorization"); got != "Bearer ms-test" { + t.Errorf("Authorization=%q, want Bearer ms-test", got) + } + _, _ = io.WriteString(w, `{"choices":[{"message":{"content":"ok"}}]}`) + })) + defer srv.Close() + + m := newModelScopeForTest(srv.URL + "/v1") + key := "ms-test" + _, err := m.ChatWithMessages("Qwen/Qwen2.5-7B-Instruct", + []Message{{Role: "user", Content: "x"}}, + &APIConfig{ApiKey: &key}, nil) + if err != nil { + t.Fatalf("ChatWithMessages: %v", err) + } +} + +func TestModelScopeChatExtractsReasoningFields(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = io.WriteString(w, `{"choices":[{"message":{ + "content":"12", + "reasoning_content":"0.15 * 80 = 12" + }}]}`) + })) + defer srv.Close() + + m := newModelScopeForTest(srv.URL) + resp, err := m.ChatWithMessages("Qwen/Qwen3-8B", + []Message{{Role: "user", Content: "15% of 80?"}}, + &APIConfig{}, nil) + if err != nil { + t.Fatalf("ChatWithMessages: %v", err) + } + if resp.ReasonContent == nil || *resp.ReasonContent != "0.15 * 80 = 12" { + t.Errorf("ReasonContent=%v", resp.ReasonContent) + } +} + +func TestModelScopeStreamHappyPath(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/v1/chat/completions" { + t.Errorf("path=%s", r.URL.Path) + } + var seen map[string]interface{} + raw, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("read body: %v", err) + http.Error(w, "read error", http.StatusBadRequest) + return + } + if err := json.Unmarshal(raw, &seen); err != nil { + t.Errorf("unmarshal request: %v", err) + http.Error(w, "unmarshal error", http.StatusBadRequest) + return + } + if seen["stream"] != true { + t.Errorf("stream=%v, want true", seen["stream"]) + } + w.Header().Set("Content-Type", "text/event-stream") + _, _ = io.WriteString(w, + `data: {"choices":[{"delta":{"reasoning_content":"step. "}}]}`+"\n"+ + `data: {"choices":[{"delta":{"content":"Hello"}}]}`+"\n"+ + `data: {"choices":[{"delta":{"content":" world"},"finish_reason":"stop"}]}`+"\n"+ + `data: [DONE]`+"\n", + ) + })) + defer srv.Close() + + m := newModelScopeForTest(srv.URL) + var content []string + var reasoning []string + var sawDone bool + err := m.ChatStreamlyWithSender("Qwen/Qwen2.5-7B-Instruct", + []Message{{Role: "user", Content: "hi"}}, + &APIConfig{}, nil, + func(c *string, r *string) error { + if r != nil && *r != "" { + reasoning = append(reasoning, *r) + } + if c != nil && *c == "[DONE]" { + sawDone = true + } + if c != nil && *c != "" && *c != "[DONE]" { + content = append(content, *c) + } + return nil + }) + if err != nil { + t.Fatalf("ChatStreamlyWithSender: %v", err) + } + if strings.Join(reasoning, "") != "step. " { + t.Errorf("reasoning=%q", strings.Join(reasoning, "")) + } + if strings.Join(content, "") != "Hello world" { + t.Errorf("content=%q", strings.Join(content, "")) + } + if !sawDone { + t.Error("expected [DONE] callback") + } +} + +func TestModelScopeStreamRejectsFalseStreamConfig(t *testing.T) { + m := newModelScopeForTest("http://unused") + stream := false + err := m.ChatStreamlyWithSender("Qwen/Qwen2.5-7B-Instruct", + []Message{{Role: "user", Content: "x"}}, + &APIConfig{}, + &ChatConfig{Stream: &stream}, + func(*string, *string) error { return nil }) + if err == nil || !strings.Contains(err.Error(), "stream must be true") { + t.Errorf("expected stream-must-be-true error, got %v", err) + } +} + +func TestModelScopeStreamCancelsOnIdle(t *testing.T) { + withModelScopeIdleTimeout(t, 200*time.Millisecond) + + hold := make(chan struct{}) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + if f, ok := w.(http.Flusher); ok { + _, _ = io.WriteString(w, `data: {"choices":[{"delta":{"content":"hi"}}]}`+"\n") + f.Flush() + } + select { + case <-hold: + case <-r.Context().Done(): + } + })) + t.Cleanup(srv.Close) + t.Cleanup(func() { close(hold) }) + + m := newModelScopeForTest(srv.URL) + err := m.ChatStreamlyWithSender("Qwen/Qwen2.5-7B-Instruct", + []Message{{Role: "user", Content: "x"}}, + &APIConfig{}, nil, + func(*string, *string) error { return nil }) + if err == nil || !strings.Contains(err.Error(), "stream idle") { + t.Errorf("expected stream-idle error, got %v", err) + } +} + +func TestModelScopeListModelsAndCheckConnection(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/v1/models" { + t.Errorf("path=%s, want /v1/models", r.URL.Path) + } + if got := r.Header.Get("Authorization"); got != "Bearer ms-test" { + t.Errorf("Authorization=%q, want Bearer ms-test", got) + } + _, _ = io.WriteString(w, `{"object":"list","data":[{"id":"Qwen/Qwen2.5-7B-Instruct"},{"id":"Qwen/Qwen3-8B"}]}`) + })) + defer srv.Close() + + m := newModelScopeForTest(srv.URL) + key := "ms-test" + apiConfig := &APIConfig{ApiKey: &key} + models, err := m.ListModels(apiConfig) + if err != nil { + t.Fatalf("ListModels: %v", err) + } + if strings.Join(models, ",") != "Qwen/Qwen2.5-7B-Instruct,Qwen/Qwen3-8B" { + t.Errorf("models=%v", models) + } + if err := m.CheckConnection(apiConfig); err != nil { + t.Fatalf("CheckConnection: %v", err) + } +} + +func TestModelScopeMissingBaseURLFailsClearly(t *testing.T) { + m := NewModelScopeModel(map[string]string{}, URLSuffix{Chat: "v1/chat/completions"}) + _, err := m.ChatWithMessages("Qwen/Qwen2.5-7B-Instruct", + []Message{{Role: "user", Content: "x"}}, + &APIConfig{}, nil) + if err == nil || !strings.Contains(err.Error(), "missing base URL") { + t.Errorf("expected missing-base-URL error, got %v", err) + } +} + +func TestModelScopeUnsupportedMethodsReturnNoSuchMethod(t *testing.T) { + m := newModelScopeForTest("http://unused") + model := "Qwen/Qwen2.5-7B-Instruct" + + if _, err := m.Embed(&model, []string{"x"}, &APIConfig{}, nil); err == nil || !strings.Contains(err.Error(), "no such method") { + t.Errorf("Embed: expected no such method, got %v", err) + } + if _, err := m.Rerank(&model, "q", []string{"d"}, &APIConfig{}, nil); err == nil || !strings.Contains(err.Error(), "no such method") { + t.Errorf("Rerank: expected no such method, got %v", err) + } + if _, err := m.Balance(&APIConfig{}); err == nil || !strings.Contains(err.Error(), "no such method") { + t.Errorf("Balance: expected no such method, got %v", err) + } + if _, err := m.TranscribeAudio(&model, nil, &APIConfig{}, nil); err == nil || !strings.Contains(err.Error(), "no such method") { + t.Errorf("TranscribeAudio: expected no such method, got %v", err) + } + if err := m.TranscribeAudioWithSender(&model, nil, &APIConfig{}, nil, nil); err == nil || !strings.Contains(err.Error(), "no such method") { + t.Errorf("TranscribeAudioWithSender: expected no such method, got %v", err) + } + if _, err := m.AudioSpeech(&model, nil, &APIConfig{}, nil); err == nil || !strings.Contains(err.Error(), "no such method") { + t.Errorf("AudioSpeech: expected no such method, got %v", err) + } + if err := m.AudioSpeechWithSender(&model, nil, &APIConfig{}, nil, nil); err == nil || !strings.Contains(err.Error(), "no such method") { + t.Errorf("AudioSpeechWithSender: expected no such method, got %v", err) + } + if _, err := m.OCRFile(&model, nil, nil, &APIConfig{}, nil); err == nil || !strings.Contains(err.Error(), "no such method") { + t.Errorf("OCRFile: expected no such method, got %v", err) + } +} diff --git a/internal/service/llm.go b/internal/service/llm.go index de3324959d..b15f0af2fd 100644 --- a/internal/service/llm.go +++ b/internal/service/llm.go @@ -169,6 +169,7 @@ func (s *LLMService) ListLLMs(tenantID string, modelType string) (ListLLMsRespon "LocalAI": true, "LM-Studio": true, "GPUStack": true, + "ModelScope": true, } objs, err := s.tenantLLMDAO.ListAllByTenant(tenantID)