diff --git a/internal/entity/models/moonshot.go b/internal/entity/models/moonshot.go index f35558ef8b..448a822686 100644 --- a/internal/entity/models/moonshot.go +++ b/internal/entity/models/moonshot.go @@ -17,11 +17,14 @@ package models import ( + "bufio" "bytes" "encoding/json" "fmt" "io" "net/http" + "ragflow/internal/logger" + "strings" "time" ) @@ -54,18 +57,279 @@ func (z *MoonshotModel) Name() string { } // Chat sends a message and returns response -func (z *MoonshotModel) Chat(modelName, message *string, apiConfig *APIConfig, chatModelConfig *ChatConfig) (*ChatResponse, error) { - return nil, fmt.Errorf("not implemented") +func (k *MoonshotModel) Chat(modelName, message *string, apiConfig *APIConfig, chatModelConfig *ChatConfig) (*ChatResponse, error) { + if message == nil { + return nil, fmt.Errorf("message is nil") + } + + var region = "default" + if apiConfig.Region != nil { + region = *apiConfig.Region + } + + url := fmt.Sprintf("%s/%s", k.BaseURL[region], k.URLSuffix.Chat) + + // Build request body + reqBody := map[string]interface{}{ + "model": modelName, + "messages": []map[string]string{ + {"role": "user", "content": *message}, + }, + "stream": false, + } + + if chatModelConfig.Stream != nil { + reqBody["stream"] = *chatModelConfig.Stream + } + + if chatModelConfig.MaxTokens != nil { + reqBody["max_tokens"] = *chatModelConfig.MaxTokens + } + + if chatModelConfig.TopP != nil { + reqBody["top_p"] = *chatModelConfig.TopP + } + + if chatModelConfig.Stop != nil { + reqBody["stop"] = *chatModelConfig.Stop + } + + if chatModelConfig.Thinking != nil { + if *chatModelConfig.Thinking { + reqBody["thinking"] = map[string]interface{}{ + "type": "enabled", + } + } else { + reqBody["thinking"] = map[string]interface{}{ + "type": "disabled", + } + } + } + + jsonData, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("failed to marshal request: %w", err) + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", *apiConfig.ApiKey)) + + resp, err := k.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + // Parse response + var result map[string]interface{} + if err = json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("failed to parse response: %w", err) + } + + choices, ok := result["choices"].([]interface{}) + if !ok || len(choices) == 0 { + return nil, fmt.Errorf("no choices in response") + } + + firstChoice, ok := choices[0].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("invalid choice format") + } + + messageMap, ok := firstChoice["message"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("invalid message format") + } + + content, ok := messageMap["content"].(string) + if !ok { + return nil, fmt.Errorf("invalid content format") + } + + var reasonContent string + if chatModelConfig.Thinking != nil && *chatModelConfig.Thinking { + reasonContent, ok = messageMap["reasoning_content"].(string) + if !ok { + return nil, fmt.Errorf("invalid content format") + } + // if first char of reasonContent is \n remove the \n + if reasonContent != "" && reasonContent[0] == '\n' { + reasonContent = reasonContent[1:] + } + } + + chatResponse := &ChatResponse{ + Answer: &content, + ReasonContent: &reasonContent, + } + + return chatResponse, nil } // ChatWithMessages sends multiple messages with roles and returns response -func (z *MoonshotModel) ChatWithMessages(modelName string, apiKey *string, messages []Message, chatModelConfig *ChatConfig) (string, error) { - return "", fmt.Errorf("%s, ChatWithMessages not implemented", z.Name()) +func (k *MoonshotModel) ChatWithMessages(modelName string, apiKey *string, messages []Message, chatModelConfig *ChatConfig) (string, error) { + return "", fmt.Errorf("%s, ChatWithMessages not implemented", k.Name()) } // ChatStreamlyWithSender sends a message and streams response via sender function (best performance, no channel) -func (z *MoonshotModel) ChatStreamlyWithSender(modelName, message *string, apiConfig *APIConfig, chatModelConfig *ChatConfig, sender func(*string, *string) error) error { - return fmt.Errorf("not implemented") +func (k *MoonshotModel) ChatStreamlyWithSender(modelName, message *string, apiConfig *APIConfig, chatModelConfig *ChatConfig, sender func(*string, *string) error) error { + var region = "default" + if apiConfig.Region != nil { + region = *apiConfig.Region + } + + url := fmt.Sprintf("%s/chat/completions", k.BaseURL[region]) + + // Build request body with streaming enabled + reqBody := map[string]interface{}{ + "model": modelName, + "messages": []map[string]string{ + {"role": "user", "content": *message}, + }, + "stream": true, + } + + if chatModelConfig.Stream != nil { + reqBody["stream"] = *chatModelConfig.Stream + } + + if chatModelConfig.MaxTokens != nil { + reqBody["max_tokens"] = *chatModelConfig.MaxTokens + } + + if chatModelConfig.Temperature != nil { + reqBody["temperature"] = *chatModelConfig.Temperature + } + + if chatModelConfig.DoSample != nil { + reqBody["do_sample"] = *chatModelConfig.DoSample + } + + if chatModelConfig.TopP != nil { + reqBody["top_p"] = *chatModelConfig.TopP + } + + if chatModelConfig.Stop != nil { + reqBody["stop"] = *chatModelConfig.Stop + } + + if chatModelConfig.Thinking != nil { + if *chatModelConfig.Thinking { + reqBody["thinking"] = map[string]interface{}{ + "type": "enabled", + } + } else { + reqBody["thinking"] = map[string]interface{}{ + "type": "disabled", + } + } + } + + jsonData, err := json.Marshal(reqBody) + if err != nil { + return fmt.Errorf("failed to marshal request: %w", err) + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", *apiConfig.ApiKey)) + + resp, err := k.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + // SSE parsing: read line by line + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Text() + logger.Info(line) + + // SSE data line starts with "data:" + if !strings.HasPrefix(line, "data:") { + continue + } + + // Extract JSON after "data:" + data := strings.TrimSpace(line[5:]) + + // [DONE] marks the end of stream + if data == "[DONE]" { + break + } + + // Parse the JSON event + var event map[string]interface{} + if err = json.Unmarshal([]byte(data), &event); err != nil { + continue + } + + choices, ok := event["choices"].([]interface{}) + if !ok || len(choices) == 0 { + continue + } + + firstChoice, ok := choices[0].(map[string]interface{}) + if !ok { + continue + } + + delta, ok := firstChoice["delta"].(map[string]interface{}) + if !ok { + continue + } + + reasoningContent, ok := delta["reasoning_content"].(string) + if ok && reasoningContent != "" { + if err := sender(nil, &reasoningContent); err != nil { + return err + } + } + + content, ok := delta["content"].(string) + if ok && content != "" { + if err := sender(&content, nil); err != nil { + return err + } + } + + finishReason, ok := firstChoice["finish_reason"].(string) + if ok && finishReason != "" { + break + } + } + + // Send [DONE] marker for OpenAI compatibility + endOfStream := "[DONE]" + if err = sender(&endOfStream, nil); err != nil { + return err + } + + return scanner.Err() } // EncodeToEmbedding encodes a list of texts into embeddings