// Package component — LLM (T1). // // One-shot LLM call. Reads system_prompt + user_prompt, dispatches to a // chat model, and returns the assistant's content. Streaming variant // forwards incremental chunks via Stream. // // Model invocation is abstracted behind a small ChatInvoker interface so // tests can inject a stub without touching the network. The default // ChatInvoker is built around models.NewEinoChatModel so production paths // flow through the eino bridge (plan §2.11.6 D1). package component import ( "context" "encoding/json" "fmt" "regexp" "slices" "sort" "strings" "sync" "time" "github.com/cloudwego/eino/schema" "ragflow/internal/agent/component/prompts" "ragflow/internal/agent/runtime" "ragflow/internal/common" "ragflow/internal/entity/models" "ragflow/internal/tokenizer" "go.uber.org/zap" ) // LLMComponent is a one-shot chat call. type LLMComponent struct { param LLMParam } // LLMParam captures the (resolved) DSL parameters for an LLM node. type LLMParam struct { ModelID string SystemPrompt string UserPrompt string Temperature *float64 TopP *float64 VisualFiles []string // extracted data:image URIs from inputs["visual_files"] Cite bool // when true, citation-instruction prompt is appended to system message MessageHistoryWindowSize int // when >0, the last N turns from state.History are prepended as prior messages ChatTemplateKwargs map[string]any // optional provider-specific kwargs (e.g. response_format, seed) MaxTokens *int JSONOutput bool OutputStructure map[string]any // when set, LLM is asked for JSON matching this schema (best-effort keys); outputs["structured"] populated // PresencePenalty mirrors Python's `presence_penalty` (range -2.0 to 2.0). // Positive values penalize new tokens based on whether they appear in the // text so far, increasing the model's likelihood to talk about new topics. PresencePenalty *float64 // FrequencyPenalty mirrors Python's `frequency_penalty` (range -2.0 to 2.0). // Positive values penalize new tokens based on their existing frequency // in the text so far, decreasing the model's likelihood to repeat the // same line verbatim. FrequencyPenalty *float64 // Driver is the provider driver to use (e.g. "openai", "dummy"). When // empty, the default ChatInvoker will look up a driver from ModelID // (e.g. by attempting NewDummyModel for unknown providers). Driver string // APIKey overrides the default empty key. Tests may set this; prod // reads it from env / secret store at higher layers. APIKey string // BaseURL overrides the driver default endpoint (e.g. to point the // "openai" driver at a third-party gateway). Empty defers to the // driver's built-in default URL. BaseURL string // MaxRetries caps the retry loop in retryInvoker. Zero = default // (3). Negative = disable retries entirely (single attempt). The // retry loop honours ctx.Done() so a request cancel aborts on // the next backoff sleep. MaxRetries int // DelayAfterError is the initial backoff between retry attempts. // Doubles on each retry, capped at 1 minute. Zero = default // (2 seconds). Matches Python's `delay_after_error` param. DelayAfterError time.Duration // Thinking mirrors the python `thinking` Agent LLM setting // (PR #15446). When set to "enabled" or "disabled", the LLM // driver is told to turn its reasoning mode on/off // (provider-specific; see chat_model.py for Qwen/Kimi/GLM // policy). Empty string means "system default" — the LLM // driver decides, which today means Qwen3 is sent // `enable_thinking=false` unless overridden. Thinking string } // LLMInput is the resolved input map the factory / Invoke expects. type LLMInput struct { ModelID string SystemPrompt string UserPrompt string Temperature *float64 TopP *float64 Cite bool MessageHistoryWindowSize int ChatTemplateKwargs map[string]any MaxTokens *int JSONOutput bool OutputStructure map[string]any Driver string APIKey string Thinking string // "enabled" | "disabled" | "" } // LLMOutput mirrors the outputs map (per plan §2.11.3 row 5): // // "content" string, "model" string, "stopped" bool, "tokens" int // // JSONOutput=true additionally populates "json" (map[string]any) when the // content parses as a JSON object. type LLMOutput struct { Content string Model string Stopped bool Tokens int } // ChatInvoker is the abstraction the LLM component uses to talk to a // chat model. The default implementation lives in this file; tests can // override the package-level defaultChatInvoker to inject a stub. type ChatInvoker interface { Invoke(ctx context.Context, req ChatInvokeRequest) (*ChatInvokeResponse, error) } // ChatInvokeRequest is the minimal surface the LLM component needs to // dispatch a chat call. Driver / APIKey / ModelName are kept here so the // invoker can wire the right provider without the component caring. type ChatInvokeRequest struct { Driver string ModelName string APIKey string BaseURL string Messages []schema.Message Temperature *float64 TopP *float64 PresencePenalty *float64 FrequencyPenalty *float64 MaxTokens *int // Thinking mirrors the agent-level `thinking` setting // ("enabled" | "disabled" | ""). The default invoker is // responsible for translating this into the provider-specific // request body (e.g. Qwen `enable_thinking`, Kimi/GLM // `thinking.type`). Empty string means "use provider default" // and the invoker should leave the provider's reasoning mode // untouched. Thinking string } // ChatInvokeResponse mirrors what the LLM component writes to its outputs. type ChatInvokeResponse struct { Content string Model string Stopped bool Tokens int } // defaultChatInvokerMu guards defaultChatInvoker swaps during tests. var defaultChatInvokerMu sync.RWMutex // defaultChatInvoker is the production ChatInvoker. Replaced in tests. var defaultChatInvoker ChatInvoker = &einoChatInvoker{} // SetDefaultChatInvoker swaps the package-level ChatInvoker (test helper). // Pass nil to restore the default. Concurrent-safe. func SetDefaultChatInvoker(inv ChatInvoker) { defaultChatInvokerMu.Lock() defer defaultChatInvokerMu.Unlock() defaultChatInvoker = inv } // getDefaultChatInvoker returns the current default ChatInvoker. func getDefaultChatInvoker() ChatInvoker { defaultChatInvokerMu.RLock() defer defaultChatInvokerMu.RUnlock() if defaultChatInvoker == nil { return &einoChatInvoker{} } return defaultChatInvoker } // GetDefaultChatInvokerForTest exposes the current package-level invoker so // cross-package tests can swap it and restore it safely. func GetDefaultChatInvokerForTest() ChatInvoker { return getDefaultChatInvoker() } // einoChatInvoker is the production ChatInvoker — it constructs a fresh // models.EinoChatModel per call from the request and dispatches. type einoChatInvoker struct{} // Invoke satisfies ChatInvoker. func (e *einoChatInvoker) Invoke(ctx context.Context, req ChatInvokeRequest) (*ChatInvokeResponse, error) { if req.ModelName == "" { return nil, fmt.Errorf("component: LLM: model_id is required") } driver := req.Driver modelName := req.ModelName if driver == "" && modelName != "" { if bareModelName, providerName, ok := splitCompositeLLMID(modelName); ok { driver = providerName modelName = bareModelName } } if driver == "" { driver = "dummy" } baseURL := baseURLMapForDriver(driver, req.BaseURL) // urlSuffix: each driver appends URLSuffix.Chat to baseURL to form // the chat-completions endpoint (e.g. "chat/completions" for // openai-compatible drivers, "v1/messages" for anthropic). The // factory's NewModelDriver accepts a zero URLSuffix and stores it // as-is; the openai driver then builds `/` (with no path), // which is the wrong endpoint for a v1-root base URL. We seed // the right suffix per driver here so the factory and the // openai driver's URL construction agree. urlSuffix := chatURLSuffixFor(driver) d, err := models.NewModelFactory().CreateModelDriver(driver, baseURL, urlSuffix) if err != nil { return nil, fmt.Errorf("component: LLM: resolve driver %q: %w", driver, err) } if d == nil { return nil, fmt.Errorf("component: LLM: no driver for %q", driver) } apiKey := req.APIKey cfg := &models.APIConfig{ApiKey: &apiKey} cm := models.NewChatModel(d, &modelName, cfg) chatCfg := &models.ChatConfig{ Temperature: req.Temperature, TopP: req.TopP, MaxTokens: req.MaxTokens, } // Propagate the agent-level Thinking setting to the driver so // providers like DeepSeek can send thinking: {type: "disabled"} // and prevent chain-of-thought from leaking into the answer. // Mirrors the Python agent/component/llm.py behaviour. switch req.Thinking { case "enabled": t := true chatCfg.Thinking = &t case "disabled": f := false chatCfg.Thinking = &f } wrapper := models.NewEinoChatModel(cm, chatCfg) out, err := wrapper.Generate(ctx, toEinoMessages(req.Messages)) if err != nil { return nil, err } return &ChatInvokeResponse{ Content: out.Content, Model: modelName, Stopped: true, Tokens: 0, }, nil } // toEinoMessages converts the LLM component's Message slice to eino's. // // Copies Role, Content, AND UserInputMultiContent (multi-modal parts), // including a deep copy of the *string URL pointers in each image part // so that callers may mutate the returned messages without affecting // the source. Without the multi-content copy and pointer deep-copy, // vision inputs would be silently dropped or shared with the caller. func toEinoMessages(msgs []schema.Message) []*schema.Message { if len(msgs) == 0 { return nil } out := make([]*schema.Message, 0, len(msgs)) for i := range msgs { m := msgs[i] role := m.Role if role == "" { role = schema.User } cloned := slices.Clone(m.UserInputMultiContent) for j, p := range cloned { if p.Image != nil { imgCopy := *p.Image if p.Image.URL != nil { u := *p.Image.URL imgCopy.URL = &u } cloned[j].Image = &imgCopy } } out = append(out, &schema.Message{ Role: role, Content: m.Content, UserInputMultiContent: cloned, }) } return out } // chatURLSuffixFor returns the URLSuffix the factory should pass to // the driver for the chat endpoint. Each driver's ChatWithMessages // builds `baseURL/URLSuffix.Chat`, so the suffix has to match the // provider's actual chat path. We seed the common ones here; for any // driver the factory has no entry for, we fall through to a default // "chat/completions" path (the openai-compatible default), which // matches the dummy driver and any third-party openai-compatible // gateway. func chatURLSuffixFor(driver string) models.URLSuffix { switch strings.ToLower(driver) { case "anthropic": return models.URLSuffix{Chat: "v1/messages"} case "ollama": return models.URLSuffix{Chat: "api/chat"} default: return models.URLSuffix{Chat: "chat/completions"} } } func baseURLMapForDriver(driver, override string) map[string]string { if override != "" { return map[string]string{"default": override} } pm := models.GetProviderManager() if pm == nil { return nil } provider := pm.FindProvider(driver) if provider == nil || len(provider.URL) == 0 { return nil } baseURL := make(map[string]string, len(provider.URL)) for region, url := range provider.URL { baseURL[region] = url } return baseURL } // NewLLMComponent builds an LLMComponent from raw params. func NewLLMComponent(p LLMParam) *LLMComponent { return &LLMComponent{param: p} } // Name returns the registered component name. func (c *LLMComponent) Name() string { return "LLM" } // Invoke runs the LLM and returns the output map. func (c *LLMComponent) Invoke(ctx context.Context, inputs map[string]any) (map[string]any, error) { p := mergeLLMParam(c.param, inputs) if p.ModelID == "" { return nil, &ParamError{Field: "model_id", Reason: "required"} } if p.UserPrompt == "" && p.SystemPrompt == "" { return nil, &ParamError{Field: "user_prompt", Reason: "at least one of user_prompt or system_prompt must be set"} } // Resolve {{cpn_id@var}} references in the system and user // prompts against the canvas state attached to ctx. When the // state is absent (e.g. tests that call Invoke directly without // going through the canvas scheduler), the prompts pass through // unchanged — backward compatible. if state, _, err := runtime.GetStateFromContext[*runtime.CanvasState](ctx); err == nil && state != nil { // ResolveTemplate returns the partial output (with "" in place // of unresolved refs) even on error — we accept the partial // output and log the error for diagnostics. This matches // Python's silent-soft-fail behavior (canvas.py returns "" for // missing refs) but adds a log line so misconfigured canvases // are still surfaced. if resolved, rerr := runtime.ResolveTemplate(p.SystemPrompt, state); resolved != p.SystemPrompt || rerr == nil { p.SystemPrompt = resolved if rerr != nil { common.Warn("component: LLM: resolve system_prompt", zap.Error(rerr)) } } if resolved, rerr := runtime.ResolveTemplate(p.UserPrompt, state); resolved != p.UserPrompt || rerr == nil { p.UserPrompt = resolved if rerr != nil { common.Warn("component: LLM: resolve user_prompt", zap.Error(rerr)) } } } // The Anthropic driver (and the openai chat-completions driver // when the system role is dropped) reject a system-only message // list with "messages is empty" / 400. v1 fixtures frequently // ship only a system prompt; fall back to using the system text // as the user message so the call still goes through. The // answer text in that case is the model continuing the // instruction in its reply slot, which is what the v1 fixtures // also expect. if p.UserPrompt == "" { p.UserPrompt = p.SystemPrompt } // Collect sys.files from canvas globals and inject their // content into prompts and the image list. Mirrors Python's // _collect_sys_files and the injection path in // _prepare_prompt_variables (llm.py:225-281). var sysFileTexts []string var sysFileImgs []string hasSysFilesPlaceholder := strings.Contains(p.SystemPrompt, "{sys.files}") || strings.Contains(p.UserPrompt, "{sys.files}") if state, _, err := runtime.GetStateFromContext[*runtime.CanvasState](ctx); err == nil && state != nil { sysFileTexts, sysFileImgs = collectSysFiles(state) if len(sysFileImgs) > 0 { p.VisualFiles = dedupStrings(append(p.VisualFiles, sysFileImgs...)) } } // When the prompt contains an explicit {sys.files} placeholder, // replace it with the collected file text and clear sysFileTexts // so it is not injected again below. if hasSysFilesPlaceholder { joined := strings.Join(sysFileTexts, "\n\n") p.SystemPrompt = strings.ReplaceAll(p.SystemPrompt, "{sys.files}", joined) p.UserPrompt = strings.ReplaceAll(p.UserPrompt, "{sys.files}", joined) sysFileTexts = nil } msgs := buildMessagesWithImages(p.SystemPrompt, p.UserPrompt, p.VisualFiles, p.Cite) // Inject sys.files text content into the last user message. if len(sysFileTexts) > 0 { joined := strings.Join(sysFileTexts, "\n\n") if len(msgs) > 0 && msgs[len(msgs)-1].Role == schema.User { last := &msgs[len(msgs)-1] if len(last.UserInputMultiContent) > 0 { inserted := false for i := range last.UserInputMultiContent { if last.UserInputMultiContent[i].Type == schema.ChatMessagePartTypeText { if last.UserInputMultiContent[i].Text != "" { last.UserInputMultiContent[i].Text += "\n\n" + joined } else { last.UserInputMultiContent[i].Text = joined } inserted = true break } } if !inserted { last.UserInputMultiContent = append([]schema.MessageInputPart{{ Type: schema.ChatMessagePartTypeText, Text: joined, }}, last.UserInputMultiContent...) } } else if last.Content != "" { last.Content += "\n\n" + joined } else { last.Content = joined } } else { msgs = append(msgs, schema.Message{Role: schema.User, Content: joined}) } } // Prepend the last N turns of conversation history from the // canvas state. Mirrors Python's `_get_chat_template_kwargs` / // `_fit_messages` path. When window size is 0 or history is // empty, // this is a no-op. if p.MessageHistoryWindowSize > 0 { if state, _, sErr := runtime.GetStateFromContext[*runtime.CanvasState](ctx); sErr == nil && state != nil { msgs = prependHistory(msgs, state.History, p.MessageHistoryWindowSize) } } // Apply message fitting (trim to context window) after all // prompt/history/sys.files augmentation and before invoking the // LLM. Mirrors Python's message_fit_in in PR #16413. { maxCtx := 0 if p.MaxTokens != nil { maxCtx = *p.MaxTokens } // The system prompt is already embedded as the first message // in msgs by buildMessagesWithImages; pass "" so fitMessages // does not duplicate it. fitted, fitErr := fitMessages("", msgs, maxCtx) if fitErr != "" { return map[string]any{"content": fitErr}, nil } msgs = fitted } inv := getDefaultChatInvoker() // Param-level retry override. When MaxRetries OR // DelayAfterError is set on LLMParam, the user is asking // for a per-call retry budget. We RE-WRAP the default // invoker in a fresh retryInvoker that respects those // values literally. // // LLM retry normal-absolute-count: when MaxRetries OR // DelayAfterError is explicitly set on LLMParam, the // operator's intent is an ABSOLUTE attempt budget. The // default invoker installed at boot in cmd/server_main.go // is itself a retryInvoker wrapping einoChatInvoker. // Without unwrapping, the two loops would multiplicatively // stack: // // boot=3, MaxRetries=5 → up to (3+1) × (5+1) = 24 // invocations, not the 6 the // operator almost certainly intended. // // unwrapChatInvoker peels off any retryInvoker layers to // reach the bare invoker, then the param-override branch // wraps that bare invoker in a fresh retryInvoker with the // operator's literal values. Net effect: the absolute attempt // count is exactly (MaxRetries + 1), independent of the boot // layer. // // Operators who do NOT set MaxRetries (both fields zero) get // the boot retry chain unchanged. The unit tests in // llm_retry_test.go pin both the unwrap behaviour and the // stacking-prevention contract. hasOverride := p.MaxRetries > 0 || p.DelayAfterError > 0 if hasOverride { maxRetries := p.MaxRetries delay := p.DelayAfterError if delay <= 0 { delay = retryInvokerBackoff } // Normalise the attempt budget: peel off the boot // retryInvoker layer (if any) so the operator's // MaxRetries is an absolute count, not a stacked one. inv = newRetryInvoker(unwrapChatInvoker(inv), maxRetries, delay) } resp, err := inv.Invoke(ctx, ChatInvokeRequest{ Driver: p.Driver, ModelName: p.ModelID, APIKey: p.APIKey, BaseURL: p.BaseURL, Messages: msgs, Temperature: p.Temperature, TopP: p.TopP, PresencePenalty: p.PresencePenalty, FrequencyPenalty: p.FrequencyPenalty, MaxTokens: p.MaxTokens, Thinking: p.Thinking, }) if err != nil { return nil, fmt.Errorf("component: LLM.Invoke: %w", err) } // Strip think blocks + JSON fences from the response. // Mirrors Python's clean_formated_answer() exactly // (re.sub(r"^.*", "", ...) + ^.*```json + trailing ```). // Python only cleans for structured output — keep raw content for // regular responses (llm.py:483: self.set_output("content", ans)). cleaned := resp.Content if p.OutputStructure != nil || p.JSONOutput { cleaned = cleanFormattedAnswer(resp.Content) } out := map[string]any{ "content": cleaned, "model": resp.Model, "stopped": resp.Stopped, "tokens": resp.Tokens, } if p.JSONOutput { var parsed map[string]any if err := json.Unmarshal([]byte(resp.Content), &parsed); err == nil { out["json"] = parsed } else { // Surface a non-fatal warning — caller can still read "content". common.Warn("component: LLM: json_output=true but content is not valid JSON", zap.Error(err)) } } if p.OutputStructure != nil { // Best-effort parse: if the first response isn't valid JSON // (or doesn't contain the expected top-level keys), retry once // with a re-prompt. OutputStructure is treated as a key-set // hint; deep schema validation (types, nested objects) is // deferred to a future phase. parsed, ok := matchOutputStructure(resp.Content, p.OutputStructure) if !ok { retryResp, err := inv.Invoke(ctx, ChatInvokeRequest{ Driver: p.Driver, ModelName: p.ModelID, APIKey: p.APIKey, BaseURL: p.BaseURL, Messages: buildStructuredRetryMessages(p.SystemPrompt, p.UserPrompt, p.VisualFiles, p.Cite, p.OutputStructure, resp.Content), Temperature: p.Temperature, TopP: p.TopP, PresencePenalty: p.PresencePenalty, FrequencyPenalty: p.FrequencyPenalty, MaxTokens: p.MaxTokens, Thinking: p.Thinking, }) if err == nil { parsed, ok = matchOutputStructure(retryResp.Content, p.OutputStructure) if ok { resp = retryResp } } } if ok { out["structured"] = parsed // Also update content to the validated response so // downstream consumers reading "content" get the JSON text. out["content"] = cleanFormattedAnswer(resp.Content) } else { common.Warn("component: LLM: output_structure set but no parseable JSON after retry") } } return out, nil } // Stream implements Component.Stream. It yields incremental chunks via // the returned channel; the channel is closed when the model finishes. // // The pattern follows the goroutine + buffered-channel + select-on-ctx // idiom: one goroutine produces chunks, the consumer selects between // receiving and ctx-cancellation. Backpressure is mitigated by the 16- // element channel buffer. // // Each chunk is a map[string]any with two keys: // - "thinking" (string): the model's reasoning content, empty if absent // - "content" (string): the model's visible content // // A final chunk with key "done" (bool=true) signals end-of-stream so // downstream consumers can flush state without relying on channel close // alone (close also works; the "done" key is informational). // // Today, the LLM driver layer returns a single non-streamed response, // so this v1 emits exactly one chunk + one done. Hooking the actual // eino stream (EinoChatModel.Stream at internal/entity/models/llm.go:137) // is deferred — the public surface here is correct, only the data // source needs to be swapped to a real StreamReader consumer in a // follow-up. func (c *LLMComponent) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) { out := make(chan map[string]any, 16) go func() { defer close(out) // Early bail-out for pre-cancelled contexts: don't run the // (potentially expensive) LLM call when the consumer has // already given up. Honors the documented select-on-ctx // pattern at the goroutine entry, not just between chunks. if err := ctx.Err(); err != nil { return } result, err := c.Invoke(ctx, inputs) if err != nil { select { case out <- map[string]any{"error": err.Error()}: case <-ctx.Done(): } return } // Single non-streamed response — emit as one content chunk. // A real streaming integration would loop over a channel // here and emit multiple chunks with partial content. chunk := map[string]any{ "thinking": "", "content": result["content"], } select { case out <- chunk: case <-ctx.Done(): return } // Final done marker. select { case out <- map[string]any{"done": true, "model": result["model"]}: case <-ctx.Done(): } }() return out, nil } // Inputs returns parameter metadata for tooling. func (c *LLMComponent) Inputs() map[string]string { return map[string]string{ "model_id": "Provider-side model identifier (e.g. \"gpt-4o-mini\")", "system_prompt": "Optional system prompt prepended to the conversation", "user_prompt": "User prompt; supports {{cpn_id@param}} references resolved by the canvas engine", "temperature": "Sampling temperature (0.0-2.0). Optional.", "top_p": "Top-p (nucleus) sampling cutoff (0.0-1.0). Optional.", "presence_penalty": "Presence penalty (-2.0 to 2.0). Positive values encourage new topics. Optional.", "frequency_penalty": "Frequency penalty (-2.0 to 2.0). Positive values discourage repetition. Optional.", "visual_files": "List of image URIs (data:image/... base64) attached to the user message as multi-modal content.", "cite": "When true (default), the citation-instruction prompt is appended to the system message.", "output_structure": "Optional map of expected top-level keys. LLM is asked to produce JSON containing these keys; one retry on failure. Populates outputs[\"structured\"].", "max_tokens": "Maximum tokens to generate. Optional.", "json_output": "If true, attempt to JSON-parse \"content\" into \"json\" output key.", "driver": "Provider driver name (openai, anthropic, …). Defaults to \"dummy\".", "api_key": "Override API key for this call. Empty defers to env.", "base_url": "Override the driver default endpoint URL.", } } // Outputs returns output metadata. func (c *LLMComponent) Outputs() map[string]string { return map[string]string{ "content": "Assistant text response", "model": "Model identifier echoed back (sanity check)", "stopped": "True if the model finished naturally", "tokens": "Reported token count (0 when not reported by the driver)", "json": "When json_output=true and content parses as a JSON object, the parsed map", } } // buildMessages assembles a system + user message sequence. Order: // system first (if set), then user. func buildMessages(system, user string) []schema.Message { out := make([]schema.Message, 0, 2) if system != "" { out = append(out, schema.Message{Role: schema.System, Content: system}) } if user != "" { out = append(out, schema.Message{Role: schema.User, Content: user}) } return out } // injectCitationPrompt returns the system message with the canonical // citation-instruction text appended. When system is empty, returns // the prompt as-is. Two newlines separate the user's system prompt // from the citation block so the LLM can parse them distinctly. // matchOutputStructure parses the LLM response and returns the // parsed map iff it is a JSON object that contains every top-level // key in expected. Inner-type validation is deferred — a future // phase will use a JSON-schema validator. func matchOutputStructure(content string, expected map[string]any) (map[string]any, bool) { var parsed map[string]any if err := json.Unmarshal([]byte(content), &parsed); err != nil { return nil, false } for k := range expected { if _, ok := parsed[k]; !ok { return nil, false } } return parsed, true } // buildStructuredRetryMessages rebuilds the message list with a // follow-up user turn that surfaces the LLM's first response and // asks for valid JSON matching the expected top-level keys. The // retry uses the same chat invoker on the next call; the message // list returned here is what gets sent on the retry. func buildStructuredRetryMessages(system, user string, images []string, cite bool, expected map[string]any, prevContent string) []schema.Message { msgs := buildMessagesWithImages(system, user, images, cite) keys := make([]string, 0, len(expected)) for k := range expected { keys = append(keys, k) } sort.Strings(keys) keysList := strings.Join(keys, ", ") retryUser := "Your previous response was not valid JSON matching the requested schema.\n\n" + "Previous response:\n" + prevContent + "\n\n" + "Please re-generate the response as a single valid JSON object containing all of these top-level keys: " + keysList + ".\n" + "Output ONLY the JSON object — no prose, no markdown code fences." if len(msgs) > 0 { msgs[len(msgs)-1] = schema.Message{ Role: schema.User, Content: retryUser, } } return msgs } func injectCitationPrompt(system string) string { prompt := prompts.CitationPrompt() if system == "" { return prompt } return system + "\n\n" + prompt } // dataImageRe matches RFC-2397 data URLs of the form // // data:image/;base64, // // where is an image MIME subtype (including structured types // like "svg+xml" and "vnd.foo") and is base64 in either the // standard alphabet ("+/=") or URL-safe alphabet ("-_=") — the regex // accepts both because real-world emitters (browser data URIs, Python // base64.urlsafe_b64encode) mix them. Validation of the actual bytes // is the driver's job; the regex is intentionally permissive about the // alphabet but strict about the "data:image/...;base64," prefix. // // Note: this regex requires ";base64," immediately after the subtype. // It does NOT accept ";charset=utf-8;base64," or other parameter-prefixed // forms — those are uncommon in canvas inputs and deferred. var dataImageRe = regexp.MustCompile(`data:image/[a-zA-Z0-9.+-]+;base64,[A-Za-z0-9+/=_-]+`) // extractDataImages scans the input strings for data:image/* // base64 URIs and returns the deduplicated set in first-seen // order. The current implementation only walks top-level string // values; recursive walk over nested structs/lists is a future // enhancement (Python's _extract_data_images covers the recursive // case). func extractDataImages(values []string) []string { seen := make(map[string]struct{}) var out []string for _, v := range values { for _, m := range dataImageRe.FindAllString(v, -1) { if _, dup := seen[m]; dup { continue } seen[m] = struct{}{} out = append(out, m) } } return out } // collectSysFiles splits sys.files from canvas globals into text parts // and image data URIs. The caller is responsible for handling any // {sys.files} placeholder replacement in the prompts. func collectSysFiles(state *runtime.CanvasState) (textParts, imageURIs []string) { files, ok := state.Globals["sys.files"] if !ok { return nil, nil } fileList, ok := files.([]any) if !ok || len(fileList) == 0 { return nil, nil } for _, f := range fileList { s, ok := f.(string) if !ok { continue } if strings.HasPrefix(s, "data:image/") { imageURIs = append(imageURIs, s) } else { textParts = append(textParts, s) } } return textParts, imageURIs } // dedupStrings returns the deduplicated slice in first-seen order. func dedupStrings(vals []string) []string { seen := make(map[string]struct{}, len(vals)) out := make([]string, 0, len(vals)) for _, v := range vals { if _, dup := seen[v]; dup { continue } seen[v] = struct{}{} out = append(out, v) } return out } // prependHistory inserts up to `window` prior turns from the canvas // history before the current system+user messages. Each history entry // is a {role, content} map; only the last `window` are kept, with // assistant/user roles preserved. Invalid entries (missing role or // content) are skipped silently. func prependHistory(current []schema.Message, history []map[string]any, window int) []schema.Message { if window <= 0 || len(history) == 0 { return current } start := 0 if len(history) > window { start = len(history) - window } out := make([]schema.Message, 0, len(current)+(len(history)-start)) for i := start; i < len(history); i++ { entry := history[i] role, _ := entry["role"].(string) content, _ := entry["content"].(string) if role == "" || content == "" { continue } out = append(out, schema.Message{Role: schema.RoleType(role), Content: content}) } return append(out, current...) } // buildMessagesWithImages assembles a system + user message sequence, // attaching data:image URIs as eino multi-modal content parts when // present. Without images the function is identical to buildMessages. // // When cite is true, the citation-instruction prompt is appended to the // system message (creating one if it was empty). This mirrors the // Python LLM._prepare_prompt_variables path where cite=True // triggers `citation_prompt()` injection. The post-stream // grounding call (Python's _gen_citations_async) is the // RetrievalService-driven citation enhancement. // // Each image is wrapped in a MessageInputPart{Type: "image_url", // Image: &MessageInputImage{MessagePartCommon{URL: dataURI}}}. The // driver layer (anthropic.go:254, google.go:168) recognises the // "image_url" part type and translates to the provider-native format. // Using URL (rather than splitting into Base64Data + MIMEType) keeps the // data URI intact, which matches the existing anthropic_test.go:221 // fixture format. func buildMessagesWithImages(system, user string, images []string, cite bool) []schema.Message { if cite { system = injectCitationPrompt(system) } out := make([]schema.Message, 0, 2) if system != "" { out = append(out, schema.Message{Role: schema.System, Content: system}) } if len(images) == 0 { if user != "" { out = append(out, schema.Message{Role: schema.User, Content: user}) } return out } parts := make([]schema.MessageInputPart, 0, 1+len(images)) if user != "" { parts = append(parts, schema.MessageInputPart{ Type: schema.ChatMessagePartTypeText, Text: user, }) } for _, uri := range images { u := uri parts = append(parts, schema.MessageInputPart{ Type: schema.ChatMessagePartTypeImageURL, Image: &schema.MessageInputImage{ MessagePartCommon: schema.MessagePartCommon{URL: &u}, }, }) } out = append(out, schema.Message{ Role: schema.User, UserInputMultiContent: parts, }) return out } // mergeLLMParam layers raw inputs over the receiver's default param set. // // v1 DSL aliases accepted alongside the v2 names: // // "llm_id" → "model_id" // "sys_prompt" → "system_prompt" // "base_url" → "BaseURL" // // The v1 fixtures in internal/agent/dsl/testdata use the // short forms; without these aliases the v1→v2 conversion (plan §2.5) // would have to be run before the factory builds the component, which // the e2e compile+invoke path doesn't do. func mergeLLMParam(base LLMParam, inputs map[string]any) LLMParam { p := base if v, ok := stringFrom(inputs, "model_id"); ok { p.ModelID = v } else if v, ok := stringFrom(inputs, "llm_id"); ok { p.ModelID = v } if v, ok := stringFrom(inputs, "system_prompt"); ok { p.SystemPrompt = v } else if v, ok := stringFrom(inputs, "sys_prompt"); ok { p.SystemPrompt = v } if v, ok := stringFrom(inputs, "user_prompt"); ok { p.UserPrompt = v } if v, ok := boolFrom(inputs, "json_output"); ok { p.JSONOutput = v } if v, ok := mapFrom(inputs, "output_structure"); ok { p.OutputStructure = v } if v, ok := boolFrom(inputs, "cite"); ok { p.Cite = v } if v, ok := intFrom(inputs, "message_history_window_size"); ok { p.MessageHistoryWindowSize = v } if v, ok := mapFrom(inputs, "chat_template_kwargs"); ok { p.ChatTemplateKwargs = v } if v, ok := stringFrom(inputs, "driver"); ok { p.Driver = v } if v, ok := stringFrom(inputs, "api_key"); ok { p.APIKey = v } if v, ok := stringFrom(inputs, "base_url"); ok { p.BaseURL = v } if v, ok := floatFrom(inputs, "temperature"); ok { f := v p.Temperature = &f } if v, ok := floatFrom(inputs, "top_p"); ok { f := v p.TopP = &f } if v, ok := floatFrom(inputs, "presence_penalty"); ok { f := v p.PresencePenalty = &f } if v, ok := floatFrom(inputs, "frequency_penalty"); ok { f := v p.FrequencyPenalty = &f } // visual_files: accept []string or single string with embedded // data URIs. The current implementation only walks top-level // string values; recursive walk is a future enhancement. if v, ok := sliceFrom(inputs, "visual_files"); ok { p.VisualFiles = extractDataImages(v) } else if v, ok := stringFrom(inputs, "visual_files"); ok { p.VisualFiles = extractDataImages([]string{v}) } if v, ok := intFrom(inputs, "max_tokens"); ok { i := v p.MaxTokens = &i } if v, ok := stringFrom(inputs, "thinking"); ok { // Only allow "enabled" or "disabled"; arbitrary DSL // strings are dropped. Python PR #15220 removed // thinking from llm.py's gen_conf() — it is no // longer forwarded to the model. The field is still // parsed here to match the Python form parameter // definition, but einoChatInvoker does not consume // it, consistent with Python's behavior. if v == "enabled" || v == "disabled" { p.Thinking = v } } return p } // effectiveContextLength returns maxLength if positive, otherwise 8192. // Mirrors Python's LLM.effective_context_length in PR #16413 — prevents // zero/negative context windows from silently trimming all prompt content. func effectiveContextLength(maxLength int) int { if maxLength > 0 { return maxLength } return 8192 } // contextFitBudget returns 97% of the effective context length as the // token budget for message_fit_in. Mirrors Python's LLM.context_fit_budget // in PR #16413. func contextFitBudget(maxLength int) int { return int(float64(effectiveContextLength(maxLength)) * 0.97) } // validateFittedMessages checks that the fitted message list is non-empty // and the last message is a non-empty user turn (content or multi-modal // parts). Returns an error string on failure, empty string on success. // Python requires len >= 2 because the system prompt is always injected // upstream; Go allows len >= 1 because the system message may be embedded // inside msgs (from buildMessagesWithImages) or absent entirely. func validateFittedMessages(msgFit []schema.Message) string { if len(msgFit) == 0 { return "**ERROR**: message_fit_in produced insufficient messages for LLM" } last := msgFit[len(msgFit)-1] if last.Role != schema.User { return "**ERROR**: LLM last message is not a user turn after prompt fitting; check model max_tokens context setting" } if strings.TrimSpace(last.Content) == "" && len(last.UserInputMultiContent) == 0 { return "**ERROR**: LLM user message is empty after prompt fitting; check model max_tokens context setting" } return "" } // fitMessages calls message_fit_in semantics on the given messages and // validates that the result ends with a non-empty user turn. Returns the // fitted messages and an error string (empty on success). // Mirrors Python's LLM.fit_messages in PR #16413. func fitMessages(systemPrompt string, msgs []schema.Message, maxLength int) ([]schema.Message, string) { // Convert schema.Message → []map[string]interface{} for fitting. all := make([]map[string]interface{}, 0, 1+len(msgs)) // Deep-copy msgs (mirrors Python's deepcopy) to avoid mutating caller's slice. copied := make([]schema.Message, len(msgs)) for i, m := range msgs { cloned := slices.Clone(m.UserInputMultiContent) for j, p := range cloned { if p.Image != nil { imgCopy := *p.Image if p.Image.URL != nil { u := *p.Image.URL imgCopy.URL = &u } cloned[j].Image = &imgCopy } } copied[i] = schema.Message{ Role: m.Role, Content: m.Content, UserInputMultiContent: cloned, } } // System prompt first (when not already embedded in msgs). if systemPrompt != "" { all = append(all, map[string]interface{}{ "role": "system", "content": systemPrompt, }) } for _, m := range copied { entry := map[string]interface{}{ "role": string(m.Role), "content": m.Content, } if len(m.UserInputMultiContent) > 0 { entry["user_input_multi_content"] = m.UserInputMultiContent } all = append(all, entry) } // Use 97% of effective context as the token budget. budget := contextFitBudget(maxLength) _, fitted := messageFitInRaw(all, budget) // Convert back to []schema.Message. result := make([]schema.Message, 0, len(fitted)) for _, m := range fitted { role, _ := m["role"].(string) content, _ := m["content"].(string) msg := schema.Message{ Role: schema.RoleType(role), Content: content, } if multi, ok := m["user_input_multi_content"].([]schema.MessageInputPart); ok { msg.UserInputMultiContent = multi } result = append(result, msg) } return result, validateFittedMessages(result) } // messageFitInRaw trims messages to fit within a token budget. Operates on // raw []map[string]interface{} (role + content). Returns the token count // used and the trimmed slice. Mirrors Python's message_fit_in in // rag/prompts/generator.py. // // Strategy: // 1. If everything fits → return as-is. // 2. Keep all system messages + the last user/assistant message. // 3. If still too large, trim content proportionally: // - System dominates (>80%) → preserve last message first. // - Otherwise → preserve system first. func messageFitInRaw(messages []map[string]interface{}, maxTokens int) (int, []map[string]interface{}) { if maxTokens <= 0 { maxTokens = 8192 } // Step 1: everything fits. totalTokens := countAllTokens(messages) if totalTokens < maxTokens { return totalTokens, messages } // Step 2: keep all system messages + the last non-system message. result := make([]map[string]interface{}, 0) for _, m := range messages { if role, _ := m["role"].(string); role == "system" { result = append(result, m) } } if len(messages) > 0 { last := messages[len(messages)-1] if role, _ := last["role"].(string); role != "system" { result = append(result, last) } } if len(result) == 0 { return 0, result } totalTokens = countAllTokens(result) if totalTokens < maxTokens { return totalTokens, result } // Step 3: trim content to fit. ll := tokenizer.NumTokensFromString(stringContent(result[0])) ll2 := tokenizer.NumTokensFromString(stringContent(result[len(result)-1])) total := ll + ll2 if total <= 0 { return 0, result } if len(result) == 1 { setContent(result[0], tokenizer.TrimContentToTokenLimit(stringContent(result[0]), maxTokens)) return countAllTokens(result), result } if float64(ll)/float64(total) > 0.8 { preservedLast := min(ll2, maxTokens) setContent(result[len(result)-1], tokenizer.TrimContentToTokenLimit(stringContent(result[len(result)-1]), preservedLast)) remaining := max(0, maxTokens-preservedLast) setContent(result[0], tokenizer.TrimContentToTokenLimit(stringContent(result[0]), remaining)) } else { preservedSystem := min(ll, maxTokens) setContent(result[0], tokenizer.TrimContentToTokenLimit(stringContent(result[0]), preservedSystem)) remaining := max(0, maxTokens-preservedSystem) setContent(result[len(result)-1], tokenizer.TrimContentToTokenLimit(stringContent(result[len(result)-1]), remaining)) } return countAllTokens(result), result } // countAllTokens returns the total token count across all messages. func countAllTokens(messages []map[string]interface{}) int { total := 0 for _, m := range messages { total += tokenizer.NumTokensFromString(stringContent(m)) } return total } // stringContent extracts the display text from a message map, or "". // For plain messages the text lives in "content"; for multimodal messages // (with images) it lives in a text part of "user_input_multi_content". func stringContent(m map[string]interface{}) string { s, _ := m["content"].(string) if s != "" { return s } if multi, ok := m["user_input_multi_content"].([]schema.MessageInputPart); ok { for _, part := range multi { if part.Type == schema.ChatMessagePartTypeText && part.Text != "" { return part.Text } } } return "" } // setContent writes text into a message map's display field. When the // message has user_input_multi_content parts, the first text part is // updated; otherwise the plain "content" field is set. func setContent(m map[string]interface{}, text string) { if multi, ok := m["user_input_multi_content"].([]schema.MessageInputPart); ok { for i, part := range multi { if part.Type == schema.ChatMessagePartTypeText { multi[i].Text = text return } } // No existing text part – prepend one. m["user_input_multi_content"] = append( []schema.MessageInputPart{{Type: schema.ChatMessagePartTypeText, Text: text}}, multi..., ) return } m["content"] = text } // stringFrom extracts a string from inputs[name], accepting both string and // fmt.Stringer-able values. func stringFrom(inputs map[string]any, name string) (string, bool) { v, ok := inputs[name] if !ok { return "", false } if s, ok := v.(string); ok { return s, true } return "", false } // mapFrom extracts a map[string]any from inputs[name]. Accepts the // canonical map[string]any shape (the shape produced by // json.Unmarshal into a map). For OutputStructure we only need the // top-level shape — schema-validation against the inner types is // deferred to a future phase. func mapFrom(inputs map[string]any, name string) (map[string]any, bool) { v, ok := inputs[name] if !ok { return nil, false } m, ok := v.(map[string]any) return m, ok } // boolFrom extracts a bool from inputs[name]. func boolFrom(inputs map[string]any, name string) (bool, bool) { v, ok := inputs[name] if !ok { return false, false } if b, ok := v.(bool); ok { return b, true } return false, false } // floatFrom extracts a float64 from inputs[name], also accepting int. func floatFrom(inputs map[string]any, name string) (float64, bool) { v, ok := inputs[name] if !ok { return 0, false } switch x := v.(type) { case float64: return x, true case float32: return float64(x), true case int: return float64(x), true case int64: return float64(x), true } return 0, false } // intFrom extracts an int from inputs[name], also accepting float64. func intFrom(inputs map[string]any, name string) (int, bool) { v, ok := inputs[name] if !ok { return 0, false } switch x := v.(type) { case int: return x, true case int64: return int(x), true case float64: return int(x), true } return 0, false } // init registers LLMComponent with the orchestrator-owned registry. func init() { Register("LLM", func(params map[string]any) (Component, error) { var p LLMParam if v, ok := stringFrom(params, "model_id"); ok { p.ModelID = v } else if v, ok := stringFrom(params, "llm_id"); ok { p.ModelID = v } if v, ok := stringFrom(params, "system_prompt"); ok { p.SystemPrompt = v } else if v, ok := stringFrom(params, "sys_prompt"); ok { p.SystemPrompt = v } if v, ok := stringFrom(params, "user_prompt"); ok { p.UserPrompt = v } if v, ok := floatFrom(params, "temperature"); ok { f := v p.Temperature = &f } if v, ok := floatFrom(params, "top_p"); ok { f := v p.TopP = &f } if v, ok := intFrom(params, "max_tokens"); ok { i := v p.MaxTokens = &i } if v, ok := boolFrom(params, "json_output"); ok { p.JSONOutput = v } if v, ok := mapFrom(params, "output_structure"); ok { p.OutputStructure = v } if v, ok := floatFrom(params, "presence_penalty"); ok { f := v p.PresencePenalty = &f } if v, ok := floatFrom(params, "frequency_penalty"); ok { f := v p.FrequencyPenalty = &f } // cite defaults to true (matches Python) when neither LLMParam // nor inputs set it. p.Cite = true if v, ok := boolFrom(params, "cite"); ok { p.Cite = v } if v, ok := intFrom(params, "message_history_window_size"); ok { p.MessageHistoryWindowSize = v } if v, ok := mapFrom(params, "chat_template_kwargs"); ok { p.ChatTemplateKwargs = v } if v, ok := stringFrom(params, "driver"); ok { p.Driver = v } if v, ok := stringFrom(params, "api_key"); ok { p.APIKey = v } if v, ok := stringFrom(params, "base_url"); ok { p.BaseURL = v } return NewLLMComponent(p), nil }) } // cleanFormattedAnswer mirrors Python's clean_formated_answer(): // // 1. Strip everything up to and including (dotall). // 2. Strip everything up to and including ```json (dotall). // 3. Strip trailing ``` and optional newlines. // // This removes DeepSeek-R1-style thinking blocks and JSON-fence // prefixes/suffixes from the raw model response. var ( reThinkPrefix = regexp.MustCompile(`(?s)^.*`) reJSONFencePrefix = regexp.MustCompile(`(?s)^.*` + "```json") reJSONFenceSuffix = regexp.MustCompile("```\n*$") ) func cleanFormattedAnswer(ans string) string { ans = reThinkPrefix.ReplaceAllString(ans, "") ans = reJSONFencePrefix.ReplaceAllString(ans, "") ans = reJSONFenceSuffix.ReplaceAllString(ans, "") return ans }