From c3eac4103a0408f9b8d25948e625e58821b5d54a Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Mon, 27 Apr 2026 14:53:33 +0800 Subject: [PATCH] Go: aliyun model provider (#14379) ### What problem does this PR solve? As title. ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai --- conf/models/aliyun.json | 31 ++ internal/entity/model.go | 12 +- internal/entity/models/aliyun.go | 421 ++++++++++++++++++++++++++ internal/entity/models/common.go | 4 +- internal/entity/models/factory.go | 2 + internal/entity/models/gitee.go | 10 +- internal/entity/models/siliconflow.go | 10 +- internal/entity/models/types.go | 2 +- internal/service/model_service.go | 2 +- 9 files changed, 474 insertions(+), 20 deletions(-) create mode 100644 conf/models/aliyun.json create mode 100644 internal/entity/models/aliyun.go diff --git a/conf/models/aliyun.json b/conf/models/aliyun.json new file mode 100644 index 0000000000..521732c75d --- /dev/null +++ b/conf/models/aliyun.json @@ -0,0 +1,31 @@ +{ + "name": "Aliyun", + "url": { + "default": "https://dashscope.aliyuncs.com", + "singapore": "https://dashscope-intl.aliyuncs.com", + "us": "https://dashscope-us.aliyuncs.com" + }, + "url_suffix": { + "chat": "compatible-mode/v1/chat/completions", + "embedding": "compatible-mode/v1/embeddings", + "models": "api/v1/deployments/models" + }, + "series": "deepseek", + "models": [ + { + "name": "qwen-flash", + "max_tokens": 995904, + "model_types": [ + "chat" + ] + } + ], + "features": { + "thinking": { + "default_value": true, + "supported_models": [ + "qwen-flash" + ] + } + } +} \ No newline at end of file diff --git a/internal/entity/model.go b/internal/entity/model.go index e1844d9b78..17fc58fc64 100644 --- a/internal/entity/model.go +++ b/internal/entity/model.go @@ -159,7 +159,7 @@ type Model struct { MaxTokens int `json:"max_tokens"` ModelTypes []string `json:"model_types"` Thinking *ModelThinking `json:"thinking"` - Series *string `json:"series"` + Type *string `json:"type"` ModelTypeMap map[string]bool } @@ -170,7 +170,7 @@ type Provider struct { URLSuffix models.URLSuffix `json:"url_suffix"` Models []*Model `json:"models"` Features Features `json:"features"` - Series string `json:"series"` + Type string `json:"type"` ModelDriver models.ModelDriver } @@ -257,12 +257,12 @@ func NewProviderManager(dirPath string) (*ProviderManager, error) { } } - if provider.Series == "" { + if provider.Type == "" { pos := strings.Index(model.Name, "-") - modelSeries := model.Name[0:pos] - model.Series = &modelSeries + modelType := model.Name[0:pos] + model.Type = &modelType } else { - model.Series = &provider.Name + model.Type = &provider.Name } model.ModelTypeMap = make(map[string]bool) diff --git a/internal/entity/models/aliyun.go b/internal/entity/models/aliyun.go new file mode 100644 index 0000000000..f3ed09a68a --- /dev/null +++ b/internal/entity/models/aliyun.go @@ -0,0 +1,421 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package models + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "ragflow/internal/logger" + "strings" + "time" +) + +// AliyunModel implements ModelDriver for Aliyun +type AliyunModel struct { + BaseURL map[string]string + URLSuffix URLSuffix + httpClient *http.Client // Reusable HTTP client with connection pool +} + +// NewAliyunModel creates a new Aliyun model instance +func NewAliyunModel(baseURL map[string]string, urlSuffix URLSuffix) *AliyunModel { + return &AliyunModel{ + BaseURL: baseURL, + URLSuffix: urlSuffix, + httpClient: &http.Client{ + Timeout: 120 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + DisableCompression: false, + }, + }, + } +} + +func (z *AliyunModel) Name() string { + return "siliconflow" +} + +// Chat sends a message and returns response +func (z *AliyunModel) Chat(modelName, message *string, apiConfig *APIConfig, chatModelConfig *ChatConfig) (*ChatResponse, error) { + if message == nil { + return nil, fmt.Errorf("message is nil") + } + + var region = "default" + if apiConfig.Region != nil { + region = *apiConfig.Region + } + + url := fmt.Sprintf("%s/%s", z.BaseURL[region], z.URLSuffix.Chat) + + // Build request body + reqBody := map[string]interface{}{ + "model": modelName, + "messages": []map[string]string{ + {"role": "user", "content": *message}, + }, + "stream": false, + "temperature": 1, + } + + if chatModelConfig.Stream != nil { + reqBody["stream"] = *chatModelConfig.Stream + } + + if chatModelConfig.MaxTokens != nil { + reqBody["max_tokens"] = *chatModelConfig.MaxTokens + } + + if chatModelConfig.Temperature != nil { + reqBody["temperature"] = *chatModelConfig.Temperature + } + + if chatModelConfig.TopP != nil { + reqBody["top_p"] = *chatModelConfig.TopP + } + + if chatModelConfig.Stop != nil { + reqBody["stop"] = *chatModelConfig.Stop + } + + if chatModelConfig.Thinking != nil { + if *chatModelConfig.Thinking { + reqBody["enable_thinking"] = true + } else { + reqBody["enable_thinking"] = false + } + } + + jsonData, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("failed to marshal request: %w", err) + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", *apiConfig.ApiKey)) + + resp, err := z.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + // Parse response + var result map[string]interface{} + if err = json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("failed to parse response: %w", err) + } + + choices, ok := result["choices"].([]interface{}) + if !ok || len(choices) == 0 { + return nil, fmt.Errorf("no choices in response") + } + + firstChoice, ok := choices[0].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("invalid choice format") + } + + messageMap, ok := firstChoice["message"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("invalid message format") + } + + answer, ok := messageMap["content"].(string) + if !ok { + return nil, fmt.Errorf("invalid content format") + } + + var reasonContent string + if chatModelConfig.Thinking != nil && *chatModelConfig.Thinking { + reasonContent, ok = messageMap["reasoning_content"].(string) + if !ok { + return nil, fmt.Errorf("invalid content format") + } + // if first char of reasonContent is \n remove the '\n' + if reasonContent != "" && reasonContent[0] == '\n' { + reasonContent = reasonContent[1:] + } + } + + //thinking, answer := GetThinkingAndAnswer(chatModelConfig.ModelType, &content) + + chatResponse := &ChatResponse{ + Answer: &answer, + ReasonContent: &reasonContent, + } + + return chatResponse, nil +} + +// ChatWithMessages sends multiple messages with roles and returns response +func (z *AliyunModel) ChatWithMessages(modelName string, apiKey *string, messages []Message, chatModelConfig *ChatConfig) (string, error) { + return "", fmt.Errorf("%s, ChatWithMessages not implemented", z.Name()) +} + +// ChatStreamlyWithSender sends a message and streams response via sender function (best performance, no channel) +func (z *AliyunModel) ChatStreamlyWithSender(modelName, message *string, apiConfig *APIConfig, chatModelConfig *ChatConfig, sender func(*string, *string) error) error { + var region = "default" + if apiConfig.Region != nil { + region = *apiConfig.Region + } + + url := fmt.Sprintf("%s/%s", z.BaseURL[region], z.URLSuffix.Chat) + + // Build request body with streaming enabled + reqBody := map[string]interface{}{ + "model": modelName, + "messages": []map[string]string{ + {"role": "user", "content": *message}, + }, + "stream": false, + "temperature": 1, + } + + if chatModelConfig.Stream != nil { + reqBody["stream"] = *chatModelConfig.Stream + } + + if chatModelConfig.MaxTokens != nil { + reqBody["max_tokens"] = *chatModelConfig.MaxTokens + } + + if chatModelConfig.Temperature != nil { + reqBody["temperature"] = *chatModelConfig.Temperature + } + + if chatModelConfig.DoSample != nil { + reqBody["do_sample"] = *chatModelConfig.DoSample + } + + if chatModelConfig.TopP != nil { + reqBody["top_p"] = *chatModelConfig.TopP + } + + if chatModelConfig.Stop != nil { + reqBody["stop"] = *chatModelConfig.Stop + } + + if chatModelConfig.Thinking != nil { + if *chatModelConfig.Thinking { + reqBody["enable_thinking"] = true + } else { + reqBody["enable_thinking"] = false + } + } + + jsonData, err := json.Marshal(reqBody) + if err != nil { + return fmt.Errorf("failed to marshal request: %w", err) + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", *apiConfig.ApiKey)) + + resp, err := z.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + // SSE parsing: read line by line + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Text() + logger.Info(line) + + // SSE data line starts with "data:" + if !strings.HasPrefix(line, "data:") { + continue + } + + // Extract JSON after "data:" + data := strings.TrimSpace(line[5:]) + + // [DONE] marks the end of stream + if data == "[DONE]" { + break + } + + // Parse the JSON event + var event map[string]interface{} + if err = json.Unmarshal([]byte(data), &event); err != nil { + continue + } + + choices, ok := event["choices"].([]interface{}) + if !ok || len(choices) == 0 { + continue + } + + firstChoice, ok := choices[0].(map[string]interface{}) + if !ok { + continue + } + + delta, ok := firstChoice["delta"].(map[string]interface{}) + if !ok { + continue + } + + content, ok := delta["content"].(string) + if ok && content != "" { + if err := sender(&content, nil); err != nil { + return err + } + } + + reasoningContent, ok := delta["reasoning_content"].(string) + if ok && reasoningContent != "" { + if err := sender(nil, &reasoningContent); err != nil { + return err + } + } + + finishReason, ok := firstChoice["finish_reason"].(string) + if ok && finishReason != "" { + break + } + } + + // Send [DONE] marker for OpenAI compatibility + endOfStream := "[DONE]" + if err = sender(&endOfStream, nil); err != nil { + return err + } + + return scanner.Err() +} + +// EncodeToEmbedding encodes a list of texts into embeddings +func (z *AliyunModel) EncodeToEmbedding(modelName *string, texts []string, apiConfig *APIConfig, embeddingConfig *EmbeddingConfig) ([][]float64, error) { + return nil, fmt.Errorf("%s, no such method", z.Name()) +} + +type AliyunModelItem struct { + ModelName string `json:"model_name"` + BaseCapacity int `json:"base_capacity"` +} + +type AliyunModelOutput struct { + Models []AliyunModelItem `json:"models"` + PageNo int `json:"page_no"` + PageSize int `json:"page_size"` + Total int `json:"total"` +} + +type AliyunModelList struct { + RequestID string `json:"request_id"` + Output AliyunModelOutput `json:"output"` +} + +func (z *AliyunModel) ListModels(apiConfig *APIConfig) ([]string, error) { + var region = "default" + if apiConfig.Region != nil { + region = *apiConfig.Region + } + + url := fmt.Sprintf("%s/%s", z.BaseURL[region], z.URLSuffix.Models) + + // Build request body + reqBody := map[string]interface{}{} + + jsonData, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("failed to marshal request: %w", err) + } + + req, err := http.NewRequest("GET", url, bytes.NewBuffer(jsonData)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", *apiConfig.ApiKey)) + + resp, err := z.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + // Parse response + var modelList AliyunModelList + if err = json.Unmarshal(body, &modelList); err != nil { + return nil, fmt.Errorf("failed to parse response: %w", err) + } + + var models []string + for _, model := range modelList.Output.Models { + modelName := model.ModelName + models = append(models, modelName) + } + + return models, nil +} + +func (z *AliyunModel) Balance(apiConfig *APIConfig) (map[string]interface{}, error) { + return nil, fmt.Errorf("%s, no such method", z.Name()) +} + +func (z *AliyunModel) CheckConnection(apiConfig *APIConfig) error { + _, err := z.ListModels(apiConfig) + if err != nil { + return err + } + return nil +} diff --git a/internal/entity/models/common.go b/internal/entity/models/common.go index dd8fd62da5..4b1b093167 100644 --- a/internal/entity/models/common.go +++ b/internal/entity/models/common.go @@ -18,8 +18,8 @@ package models import "strings" -func GetThinkingAndAnswer(modelSeries *string, content *string) (*string, *string) { - switch *modelSeries { +func GetThinkingAndAnswer(modelType *string, content *string) (*string, *string) { + switch *modelType { case "qwen3": return extractThinkContent(content) } diff --git a/internal/entity/models/factory.go b/internal/entity/models/factory.go index d03a020ff1..003a88b225 100644 --- a/internal/entity/models/factory.go +++ b/internal/entity/models/factory.go @@ -45,6 +45,8 @@ func (f *ModelFactory) CreateModelDriver(providerName string, baseURL map[string return NewGiteeModel(baseURL, urlSuffix), nil case "siliconflow": return NewSiliconflowModel(baseURL, urlSuffix), nil + case "aliyun": + return NewAliyunModel(baseURL, urlSuffix), nil default: return NewDummyModel(baseURL, urlSuffix), nil } diff --git a/internal/entity/models/gitee.go b/internal/entity/models/gitee.go index f1eb7058dd..35cc7ef8ca 100644 --- a/internal/entity/models/gitee.go +++ b/internal/entity/models/gitee.go @@ -69,10 +69,10 @@ func (z *GiteeModel) Chat(modelName, message *string, apiConfig *APIConfig, chat url := fmt.Sprintf("%s/%s", z.BaseURL[region], z.URLSuffix.Chat) - // I need to get the model series, such as qwen3 is the prefix, the model series will be qwen. glm is the prefix, the model series will be glm. such as the model name: qwen3-0.6b, the model series will be qwen3 - // the model name is glm-4.7, the model series will be glm - modelSeries := strings.Split(*modelName, "-")[0] - if modelSeries == "qwen" || modelSeries == "glm" { + // I need to get the model type, such as qwen3 is the prefix, the model type will be qwen. glm is the prefix, the model type will be glm. such as the model name: qwen3-0.6b, the model type will be qwen3 + // the model name is glm-4.7, the model type will be glm + modelType := strings.Split(*modelName, "-")[0] + if modelType == "qwen" || modelType == "glm" { url = fmt.Sprintf("%s/%s", z.BaseURL[region], z.URLSuffix.AsyncChat) } @@ -172,7 +172,7 @@ func (z *GiteeModel) Chat(modelName, message *string, apiConfig *APIConfig, chat return nil, fmt.Errorf("invalid content format") } - thinking, answer := GetThinkingAndAnswer(chatModelConfig.ModelSeries, &content) + thinking, answer := GetThinkingAndAnswer(chatModelConfig.ModelType, &content) chatResponse := &ChatResponse{ Answer: answer, diff --git a/internal/entity/models/siliconflow.go b/internal/entity/models/siliconflow.go index f4a6c0ef78..8edb0e7436 100644 --- a/internal/entity/models/siliconflow.go +++ b/internal/entity/models/siliconflow.go @@ -69,10 +69,10 @@ func (z *SiliconflowModel) Chat(modelName, message *string, apiConfig *APIConfig url := fmt.Sprintf("%s/%s", z.BaseURL[region], z.URLSuffix.Chat) - // I need to get the model series, such as qwen3 is the prefix, the model series will be qwen. glm is the prefix, the model series will be glm. such as the model name: qwen3-0.6b, the model series will be qwen3 - // the model name is glm-4.7, the model series will be glm - modelSeries := strings.Split(*modelName, "-")[0] - if modelSeries == "qwen" || modelSeries == "glm" { + // I need to get the model type, such as qwen3 is the prefix, the model type will be qwen. glm is the prefix, the model type will be glm. such as the model name: qwen3-0.6b, the model type will be qwen3 + // the model name is glm-4.7, the model type will be glm + modelType := strings.Split(*modelName, "-")[0] + if modelType == "qwen" || modelType == "glm" { url = fmt.Sprintf("%s/%s", z.BaseURL[region], z.URLSuffix.AsyncChat) } @@ -172,7 +172,7 @@ func (z *SiliconflowModel) Chat(modelName, message *string, apiConfig *APIConfig return nil, fmt.Errorf("invalid content format") } - thinking, answer := GetThinkingAndAnswer(chatModelConfig.ModelSeries, &content) + thinking, answer := GetThinkingAndAnswer(chatModelConfig.ModelType, &content) chatResponse := &ChatResponse{ Answer: answer, diff --git a/internal/entity/models/types.go b/internal/entity/models/types.go index d9461aaf7d..1163a438e7 100644 --- a/internal/entity/models/types.go +++ b/internal/entity/models/types.go @@ -52,7 +52,7 @@ type ChatConfig struct { TopP *float64 DoSample *bool Stop *[]string - ModelSeries *string + ModelType *string Effort *string Verbosity *string } diff --git a/internal/service/model_service.go b/internal/service/model_service.go index e853789a71..b382a12922 100644 --- a/internal/service/model_service.go +++ b/internal/service/model_service.go @@ -776,7 +776,7 @@ func (m *ModelProviderService) ChatToModel(providerName, instanceName, modelName return nil, common.CodeNotFound, errors.New(fmt.Sprintf("provider %s model %s not found", providerName, modelName)) } - modelConfig.ModelSeries = model.Series + modelConfig.ModelType = model.Type var extra map[string]string err = json.Unmarshal([]byte(instance.Extra), &extra)