//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Package component — Invoke component (T3).
//
// Invoke is the canvas HTTP client node. It supports GET/POST/
// PUT/DELETE with custom headers, optional proxy, and per-request
// timeout, and wraps the underlying net/http.Transport with
// go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp
// .NewTransport so outbound calls automatically propagate W3C
// traceparent headers.
package component
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
const (
componentNameInvoke = "Invoke"
defaultInvokeTimeout = 30 * time.Second
defaultInvokeUserAgent = "ragflow-agent/1.0 (Invoke component)"
defaultInvokeContentCT = "application/json"
maxInvokeResponseBody = 16 << 20 // 16 MiB; hard cap to avoid OOM
)
// InvokeComponent is the HTTP client node. Stateless across invocations.
type InvokeComponent struct {
name string
}
// NewInvokeComponent constructs an Invoke component.
func NewInvokeComponent(_ map[string]any) (Component, error) {
return &InvokeComponent{name: componentNameInvoke}, nil
}
// Name returns the registered component name.
func (i *InvokeComponent) Name() string { return i.name }
// Invoke executes a single HTTP request and returns the status code,
// body, and response headers. See Inputs() for the param contract.
func (i *InvokeComponent) Invoke(ctx context.Context, inputs map[string]any) (map[string]any, error) {
method, _ := inputs["method"].(string)
method = strings.ToUpper(strings.TrimSpace(method))
switch method {
case http.MethodGet, http.MethodPost, http.MethodPut, http.MethodDelete:
default:
return nil, fmt.Errorf("Invoke: invalid method %q (want GET/POST/PUT/DELETE)", method)
}
rawURL, _ := inputs["url"].(string)
if rawURL == "" {
return nil, errors.New("Invoke: url is required")
}
// url.Parse is a sanity check; we trust the orchestrator to have
// already resolved any {{...}} refs, but a bad string here is a
// programmer error worth surfacing.
if _, err := url.Parse(rawURL); err != nil {
return nil, fmt.Errorf("Invoke: parse url: %w", err)
}
timeout := defaultInvokeTimeout
if v, ok := inputs["timeout"].(int); ok && v > 0 {
timeout = time.Duration(v) * time.Second
} else if v, ok := inputs["timeout"].(float64); ok && v > 0 {
timeout = time.Duration(v) * time.Second
}
contentType, _ := inputs["content_type"].(string)
if contentType == "" && (method == http.MethodPost || method == http.MethodPut) {
contentType = defaultInvokeContentCT
}
var body io.Reader
if s, ok := inputs["body"].(string); ok && s != "" {
body = bytes.NewReader([]byte(s))
}
req, err := http.NewRequestWithContext(ctx, method, rawURL, body)
if err != nil {
return nil, fmt.Errorf("Invoke: build request: %w", err)
}
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
req.Header.Set("User-Agent", defaultInvokeUserAgent)
if h, ok := inputs["headers"].(map[string]any); ok {
for k, v := range h {
if s, ok := v.(string); ok {
req.Header.Set(k, s)
}
}
}
// Wrap the stdlib Transport with otelhttp so the request gets a
// child span + W3C traceparent injected automatically.
transport := otelhttp.NewTransport(http.DefaultTransport)
// Optional proxy support: if inputs["proxy"] is set, build a
// dedicated Transport that uses it. This avoids mutating the
// global http.DefaultTransport (which would also affect unrelated
// components in the same process).
if proxyStr, ok := inputs["proxy"].(string); ok && proxyStr != "" {
transport = otelhttp.NewTransport(&http.Transport{
Proxy: http.ProxyURL(mustParseProxy(proxyStr)),
})
}
client := &http.Client{
Timeout: timeout,
Transport: transport,
}
// a generic HTTP client node in the canvas DSL — operators wire it
// to arbitrary endpoints. SSRF surface is limited to operators
// (not end users), and outbound traffic is rate-limited by the
// client timeout + maxInvokeResponseBody cap above.
// codeql[go/request-forgery] Intentional: the Invoke component is
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("Invoke: do: %w", err)
}
defer resp.Body.Close()
// Cap the response body to keep a hostile server from streaming
// infinite bytes into memory.
limited := io.LimitReader(resp.Body, maxInvokeResponseBody)
bodyBytes, err := io.ReadAll(limited)
if err != nil {
return nil, fmt.Errorf("Invoke: read body: %w", err)
}
hdr := make(map[string]string, len(resp.Header))
for k, vs := range resp.Header {
// First value only — multi-value headers are uncommon in
// canvas-DSL HTTP responses, and the param contract specifies
// a string map.
if len(vs) > 0 {
hdr[k] = vs[0]
}
}
bodyStr := string(bodyBytes)
// Clean HTML from response body when clean_html input is set.
if cleanHTML, _ := inputs["clean_html"].(bool); cleanHTML {
bodyStr = stripHTMLTags(bodyStr)
}
// Parse body according to the requested datatype.
datatype, _ := inputs["datatype"].(string)
if datatype == "" {
// Infer from Content-Type header.
ct := resp.Header.Get("Content-Type")
if strings.Contains(ct, "application/json") {
datatype = "json"
} else {
datatype = "text"
}
}
return map[string]any{
"status": resp.StatusCode,
"body": bodyStr,
"headers": hdr,
"datatype": datatype,
}, nil
}
// Stream is a synchronous facade over Invoke. Real streaming
// (chunked transfer as it arrives) is a future enhancement.
func (i *InvokeComponent) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
out, err := i.Invoke(ctx, inputs)
if err != nil {
return nil, err
}
ch := make(chan map[string]any, 1)
ch <- out
close(ch)
return ch, nil
}
// Inputs returns the public parameter surface.
func (i *InvokeComponent) Inputs() map[string]string {
return map[string]string{
"method": "HTTP method: GET, POST, PUT, or DELETE (case-insensitive).",
"url": "Target URL; can be a {{...}} reference resolved upstream.",
"headers": "Optional map of string headers.",
"body": "Optional request body (string).",
"timeout": "Per-request timeout in seconds; default 30.",
"proxy": "Optional proxy URL (e.g. http://host:3128).",
"content_type": "Optional Content-Type; default 'application/json' for POST/PUT.",
"clean_html": "When true, strip HTML tags from the response body.",
"datatype": "Expected response datatype: 'json', 'text', or 'html'. Default 'json'.",
"variables": "Optional template variables for URL/body interpolation.",
}
}
// Outputs returns the response surface.
func (i *InvokeComponent) Outputs() map[string]string {
return map[string]string{
"status": "HTTP status code (int).",
"body": "Response body (string, truncated at 16 MiB).",
"headers": "Response headers (first value per key).",
"datatype": "Inferred response datatype: 'json' | 'text' | 'html'.",
}
}
// mustParseProxy parses a proxy URL string. We keep this helper here
// (rather than calling url.Parse inline) so the panic-on-bad-input
// behavior is uniform across the package — proxy strings are operator-
// configured, a malformed one is a deployment error worth crashing
// loud on.
func mustParseProxy(raw string) *url.URL {
u, err := url.Parse(raw)
if err != nil {
panic(fmt.Sprintf("Invoke: invalid proxy URL %q: %v", raw, err))
}
// Defensive check: net/http.ProxyURL will silently no-op on a
// URL with no Host. Surface a clear panic instead.
if u.Host == "" {
panic(fmt.Sprintf("Invoke: proxy URL %q has no host", raw))
}
return u
}
// stripHTMLTags removes HTML tags from the input string. This is a
// best-effort implementation — it uses a simple regexp to remove
// everything between < and >. It is NOT a full HTML sanitizer and
// should only be used for cleaning up response text for consumption
// by downstream LLM nodes.
// Mirrors Python's `strip_html_tags` helper (invoke.py).
func stripHTMLTags(s string) string {
// Simple regexp-based approach: remove everything between < and >
re := strings.NewReplacer(
"\n",
"\n",
)
s = re.Replace(s)
for {
start := strings.Index(s, "<")
if start == -1 {
break
}
end := strings.Index(s[start:], ">")
if end == -1 {
break
}
s = s[:start] + s[start+end+1:]
}
// Collapse multiple newlines
for strings.Contains(s, "\n\n\n") {
s = strings.ReplaceAll(s, "\n\n\n", "\n\n")
}
return strings.TrimSpace(s)
}
// netHTTPImports is a no-op reference to keep `net` in the import set
// for go vet's unused-import check while the production code path
// doesn't otherwise need the net package (only used by the optional
// proxy path via http.ProxyURL).
var _ = net.IPv4len
func init() {
Register(componentNameInvoke, NewInvokeComponent)
}