Files
ragflow/internal/handler/agent_webhook.go
Zhichang Yu 5b09910d52 feat[Go]: port agent webhook trigger, agent file upload/download, component input-form + debug endpoints from Python (#16403)
port agent webhook trigger, agent file upload/download, component
input-form + debug endpoints from Python
- [x] New Feature (non-breaking change which adds functionality)
2026-06-27 14:07:22 +08:00

670 lines
22 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package handler
// Webhook trigger handler — Go port of
// api/apps/restful_apis/agent_api.py:1563-2248
// (`/api/v1/agents/<agent_id>/webhook` and `/.../webhook/test`).
//
// Mirrors the python reference's 9-step flow:
// 1. Load canvas (IDOR-safe via loadCanvasForUser → LoadCanvasByID).
// 2. Reject DataFlow canvas category.
// 3. Parse DSL map.
// 4. Find Begin with mode=="Webhook"; capture webhook_cfg.
// 5. Validate request method against webhook_cfg.methods.
// 6. validateWebhookSecurity (max body size / IP whitelist / rate limit
// / token / basic / jwt — strict fail-closed rate limit).
// 7. Parse request body (parseWebhookRequest).
// 8. Schema extract query/headers/body via extractBySchema.
// 9. Dispatch:
// - execution_mode == "Immediately" → return configured status +
// body_template synchronously; canvas runs detached in the
// background. Trace events appended when isTest.
// - else (streaming/aggregate) → block on canvas.run, aggregate
// message content, return JSON.
//
// Notes on Python parity divergences:
// - `cvs.dsl = json.loads(str(canvas)); UserCanvasService.update_by_id(...)`
// post-run DSL writeback is NOT ported. The Go runner mutates an
// in-memory copy, not the persisted row. UpdateAgent already persists
// the editable DSL on the user-driven path.
// - multipart/form-data uploads are rejected with HTTP 501
// (ErrWebhookMultipartNotSupported) at parse time. The Python
// upload path through canvas.get_files_async depends on
// FileService.upload_info which is itself not in the Go port yet.
// When FileService.upload_info lands, the 501 branch in Webhook
// (line 162-185) becomes the dispatch entry point.
// - content_types whitelist ENFORCES a hard reject (102 envelope)
// when the request Content-Type does not match the configured
// value. Mirrors python agent_api.py:1839-1842 (`raise
// ValueError("Invalid Content-Type...")`).
// - Webhook trace shape (webhook-trace-<id>-logs in Redis) matches
// the python append_webhook_trace at agent_api.py:2073-2091 so the
// eventual /webhook/logs poll PR can consume this writer
// unchanged.
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"ragflow/internal/agent/canvas"
"ragflow/internal/common"
rediscli "ragflow/internal/engine/redis"
"strconv"
"strings"
"time"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"ragflow/internal/dao"
"ragflow/internal/entity"
)
// canvasLoader is the subset of service.AgentService the webhook handler
// needs. Defined as an interface so handler tests can inject a fake
// without standing up the full AgentService (DB DAOs, eino runner, etc).
//
// LoadCanvasByID returns the raw DAO/service error so the WEBHOOK
// handler can fold it into its own 102 envelope. We deliberately do NOT
// route through handler.mapAgentError here because that helper maps
// ErrUserCanvasNotFound to 103 "Make sure you have permission..." which
// is the chat/run contract — wrong for the webhook path.
type canvasLoader interface {
LoadCanvasByID(ctx context.Context, userID, canvasID string) (*entity.UserCanvas, error)
RunAgentWithWebhook(ctx context.Context, userID, canvasID string, payload map[string]any) (<-chan canvas.RunEvent, error)
}
// Webhook is the handler method mounted at:
//
// /api/v1/agents/:canvas_id/webhook (production trigger)
// /api/v1/agents/:canvas_id/webhook/test (test trigger, with trace)
//
// The Python decorator stack at agent_api.py:1563 binds six methods to
// a single path. Gin has no Match() — the router registers each verb
// individually via registerAnyMethod (see internal/router/agent_routes.go).
func (h *AgentHandler) Webhook(c *gin.Context) {
user, code, msg := GetUser(c)
if code != common.CodeSuccess {
jsonError(c, code, msg)
return
}
canvasID := c.Param("canvas_id")
if h.loader == nil {
jsonInternalError(c, errors.New("agent webhook: loader not configured"))
return
}
isTest := strings.HasSuffix(c.Request.URL.Path, "/webhook/test")
startTs := time.Now()
// 1. Load canvas. Webhook collapses missing-vs-foreign into a
// single 102 "Canvas not found." (matches Python
// api/apps/restful_apis/agent_api.py:1572 and the existing
// GetAgentWebhookLogs envelope), so we DO NOT route through
// handler.mapAgentError here.
cv, err := h.loader.LoadCanvasByID(c.Request.Context(), user.ID, canvasID)
if err != nil {
if errors.Is(err, dao.ErrUserCanvasNotFound) || errors.Is(err, dao.ErrUserCanvasVersionNotFound) {
jsonError(c, common.CodeDataError, "Canvas not found.")
return
}
jsonInternalError(c, err)
return
}
// 2. Reject DataFlow.
if cv.CanvasCategory == "DataFlow" {
jsonError(c, common.CodeDataError, "Dataflow can not be triggered by webhook.")
return
}
// 3. DSL map. cv.DSL is a typed entity.JSONMap (map[string]any
// under the hood); we copy into a plain map[string]any so the
// downstream helpers can mutate freely without aliasing surprises.
dsl := map[string]any{}
for k, v := range cv.DSL {
dsl[k] = v
}
// 4. Find Begin component with mode=="Webhook".
webhookCfg := findWebhookBegin(dsl)
if webhookCfg == nil {
jsonError(c, common.CodeDataError, "Webhook not configured for this agent.")
return
}
// 5. Method gate.
if !methodAllowed(webhookCfg["methods"], c.Request.Method) {
jsonError(c, common.CodeDataError,
fmt.Sprintf("HTTP method '%s' not allowed for this webhook.", c.Request.Method))
return
}
// 6. Security gate (strict; surfaces all errors as 102).
securityCfg := stringMap(webhookCfg["security"])
if err := validateWebhookSecurity(securityCfg, c, canvasID); err != nil {
jsonError(c, common.CodeDataError, err.Error())
return
}
// 6a. Body size DoS hardening (security review MEDIUM-1). The
// security validator above checks the Content-Length header, but
// that header is attacker-controllable. Wrap the actual stream
// reader with http.MaxBytesReader so io.ReadAll inside
// parseWebhookRequest is bounded by the same parsed limit.
// Errors here surface as ErrWebhookContentTypeMismatch-shaped 102
// so the operator sees the failure mode consistently with the
// header check.
if limit, perr := parseMaxBodySize(securityCfg); perr == nil && limit > 0 && c.Request.Body != nil {
c.Request.Body = http.MaxBytesReader(c.Writer, c.Request.Body, limit)
}
// 7. Parse request body. Two error classes flow through here:
// - ErrWebhookMultipartNotSupported → HTTP 501 (file uploads
// depend on FileService.upload_info which is not yet ported).
// - ErrWebhookContentTypeMismatch → HTTP 102 (content-type
// whitelist, mirrors python agent_api.py:1839).
// Both are surfaced as typed errors so the dispatch envelope is
// unambiguous — neither path falls through into schema validation.
contentType, _ := webhookCfg["content_types"].(string)
parsed, parseErr := parseWebhookRequest(contentType, c)
if parseErr != nil {
switch {
case errors.Is(parseErr, ErrWebhookMultipartNotSupported):
// 501 — multipart/form-data uploads are not yet
// supported. Body is short so operators see exactly what
// is missing.
c.JSON(http.StatusNotImplemented, gin.H{
"code": http.StatusNotImplemented,
"data": false,
"message": parseErr.Error(),
})
return
case errors.Is(parseErr, ErrWebhookContentTypeMismatch):
jsonError(c, common.CodeDataError, parseErr.Error())
return
default:
jsonError(c, common.CodeDataError, parseErr.Error())
return
}
}
// 8. Schema extract query/headers/body.
schema, _ := webhookCfg["schema"].(map[string]any)
clean, schemaErr := applyWebhookSchema(parsed, schema)
if schemaErr != nil {
jsonError(c, common.CodeDataError, schemaErr.Error())
return
}
// 9. Dispatch.
mode, _ := webhookCfg["execution_mode"].(string)
if mode == "" {
mode = "Immediately"
}
if mode == "Immediately" {
status, contentType, payload, perr := renderImmediatelyResponse(stringMap(webhookCfg["response"]))
if perr != nil {
jsonError(c, common.CodeDataError, perr.Error())
return
}
// Detached background run — does NOT inherit c.Request.Context()
// so a client disconnect does not cancel the canvas run.
go h.runWebhookDetached(cv, clean, isTest, startTs)
c.Data(status, contentType, payload)
return
}
// Streaming / blocking mode.
out := h.runWebhookSync(c.Request.Context(), cv, clean, isTest, startTs)
c.JSON(out.status, out.body)
}
// findWebhookBegin scans the DSL components map for a Begin component
// whose params.mode equals "Webhook". Returns the params map (the
// webhook_cfg) on success; nil otherwise. Mirrors agent_api.py:1584-1592.
func findWebhookBegin(dsl map[string]any) map[string]any {
if dsl == nil {
return nil
}
components, _ := dsl["components"].(map[string]any)
if components == nil {
return nil
}
for _, raw := range components {
entry, _ := raw.(map[string]any)
if entry == nil {
continue
}
obj, _ := entry["obj"].(map[string]any)
if obj == nil {
continue
}
name, _ := obj["component_name"].(string)
if !strings.EqualFold(name, "begin") {
continue
}
params, _ := obj["params"].(map[string]any)
if params == nil {
continue
}
if mode, _ := params["mode"].(string); mode == "Webhook" {
return params
}
}
return nil
}
// methodAllowed returns true when `requestMethod` is in the configured
// list. Empty list → allow (matches python: agent_api.py:1596 short-circuit).
func methodAllowed(raw any, requestMethod string) bool {
methods, _ := raw.([]any)
if len(methods) == 0 {
return true
}
want := strings.ToUpper(requestMethod)
for _, m := range methods {
if s, _ := m.(string); strings.ToUpper(s) == want {
return true
}
}
return false
}
// stringMap is a tiny helper: extract a map[string]any, treating nil and
// wrong-type cases as empty maps. Used heavily to keep the dispatch
// logic readable.
func stringMap(v any) map[string]any {
if m, ok := v.(map[string]any); ok {
return m
}
return map[string]any{}
}
// parseWebhookRequest mirrors agent_api.py:1828-1894.
//
// Returns (parsed, error). The parsed map carries query / headers /
// body / content_type. The body is parsed according to the request
// Content-Type:
//
// - application/json → unmarshal JSON
// - application/x-www-form-urlencoded → form fields
// - text/plain / octet-stream / unknown / empty → raw bytes → JSON
//
// Two errors are surfaced directly to the handler so the dispatch path
// can return the right envelope without falling through into a
// misleading schema-validation error:
//
// - ErrWebhookMultipartNotSupported (501): the request used
// multipart/form-data. The Python path uploads files through
// FileService.upload_info → canvas.get_files_async, both of which
// are NOT yet ported. We refuse the request up front so callers
// see a clear, typed 501 instead of an unrelated schema error.
//
// - ErrWebhookContentTypeMismatch (102): when the webhook config
// sets content_types, the request Content-Type must match. The
// Python reference raises here
// (`raise ValueError("Invalid Content-Type...")` at
// agent_api.py:1839-1842); we mirror that as a typed error and
// surface it through the same 102 envelope as the rest of the
// validation errors so operators see the same response shape.
func parseWebhookRequest(configuredContentType string, c *gin.Context) (map[string]any, error) {
// 1. Query
q := map[string]any{}
for k, vals := range c.Request.URL.Query() {
if len(vals) == 1 {
q[k] = vals[0]
} else {
q[k] = vals
}
}
// 2. Headers
hd := map[string]any{}
for k, vals := range c.Request.Header {
if len(vals) == 1 {
hd[k] = vals[0]
} else {
hd[k] = vals
}
}
// 3. Body
ctype := strings.SplitN(c.GetHeader("Content-Type"), ";", 2)[0]
ctype = strings.TrimSpace(strings.ToLower(ctype))
// multipart/form-data → 501. Checked BEFORE the content-type match
// below because multipart uploads don't carry a useful
// `content_types` value to validate against — the python handler
// routes them through canvas.get_files_async which we have not
// ported yet.
if ctype == "multipart/form-data" {
return nil, ErrWebhookMultipartNotSupported
}
body := map[string]any{}
switch ctype {
case "application/json":
raw, _ := io.ReadAll(c.Request.Body)
if len(raw) > 0 {
_ = json.Unmarshal(raw, &body)
}
case "application/x-www-form-urlencoded":
if err := c.Request.ParseForm(); err == nil {
for k, vals := range c.Request.PostForm {
if len(vals) == 1 {
body[k] = vals[0]
} else {
body[k] = vals
}
}
}
default:
raw, _ := io.ReadAll(c.Request.Body)
if len(raw) > 0 {
_ = json.Unmarshal(raw, &body)
}
}
// content_type whitelist. Empty configuredContentType → no check
// (matches python: agent_api.py:1839 only raises when the
// webhook_cfg actually sets content_types).
//
// The python reference has a `if ctype and ...` short-circuit that
// lets a request with NO Content-Type header through even when the
// webhook config requires one. That is a whitelist bypass — the
// configured type is irrelevant if a caller can simply omit the
// header. The Go port tightens the contract: when the operator
// configured content_types, the request MUST carry a matching
// Content-Type. Missing header → ErrWebhookContentTypeMismatch.
if configuredContentType != "" && configuredContentType != ctype {
return nil, fmt.Errorf("%w: expect %q, got %q",
ErrWebhookContentTypeMismatch, configuredContentType, ctype)
}
return map[string]any{
"query": q,
"headers": hd,
"body": body,
"content_type": ctype,
}, nil
}
// ErrWebhookMultipartNotSupported is returned by parseWebhookRequest
// when the inbound Content-Type is multipart/form-data. The handler
// translates this to HTTP 501 Not Implemented because the Python file
// upload path (FileService.upload_info → canvas.get_files_async) is
// not ported yet.
var ErrWebhookMultipartNotSupported = errors.New("multipart/form-data uploads are not supported in this port")
// ErrWebhookContentTypeMismatch is returned by parseWebhookRequest when
// the configured content_types whitelist disagrees with the request's
// Content-Type. Mirrors python agent_api.py:1839-1842 (`raise
// ValueError("Invalid Content-Type...")`). Surfaced as 102 so the
// operator sees a clear, content-type-specific message.
var ErrWebhookContentTypeMismatch = errors.New("invalid content-type")
// applyWebhookSchema runs extractBySchema on the parsed request's three
// sections and assembles the clean_request map that the Python
// webhook handler passes to canvas.run(webhook_payload=...). Mirrors
// agent_api.py:2052-2068.
func applyWebhookSchema(parsed map[string]any, schema map[string]any) (map[string]any, error) {
q := stringMap(parsed["query"])
hd := stringMap(parsed["headers"])
bd := stringMap(parsed["body"])
qClean, err := extractBySchema(q, stringMap(schema["query"]), "query")
if err != nil {
return nil, err
}
hdClean, err := extractBySchema(hd, stringMap(schema["headers"]), "headers")
if err != nil {
return nil, err
}
bdClean, err := extractBySchema(bd, stringMap(schema["body"]), "body")
if err != nil {
return nil, err
}
return map[string]any{
"query": qClean,
"headers": hdClean,
"body": bdClean,
"input": parsed,
}, nil
}
// renderImmediatelyResponse builds the synchronous response for the
// Immediately execution mode. Mirrors agent_api.py:2093-2121.
//
// - status: int in [200, 399]; defaults to 200; any other value
// raises so the operator notices a config bug.
// - body_template: JSON or text. We try JSON first; fall back to
// plain text on failure. Empty body → no body, content-type
// application/json (matching python parse_body(None)).
func renderImmediatelyResponse(cfg map[string]any) (int, string, []byte, error) {
statusRaw, ok := cfg["status"]
status := 200
if ok {
switch v := statusRaw.(type) {
case int:
status = v
case int64:
status = int(v)
case float64:
status = int(v)
case string:
n, err := strconv.Atoi(v)
if err != nil {
return 0, "", nil, fmt.Errorf("invalid response status code: %v", v)
}
status = n
default:
return 0, "", nil, fmt.Errorf("invalid response status code: %v", v)
}
}
if status < 200 || status > 399 {
return 0, "", nil, fmt.Errorf("invalid response status code: %d, must be between 200 and 399", status)
}
bodyTpl, _ := cfg["body_template"].(string)
if bodyTpl == "" {
return status, "application/json", nil, nil
}
// Try JSON parse first (python: parse_body() branch at line 2110).
var probe any
if err := json.Unmarshal([]byte(bodyTpl), &probe); err == nil {
encoded, _ := json.Marshal(probe)
return status, "application/json", encoded, nil
}
return status, "text/plain", []byte(bodyTpl), nil
}
// runWebhookDetached runs the canvas in the background. It uses
// context.Background() with a 5-minute timeout (NOT
// c.Request.Context()) so a client disconnect does NOT cancel the run.
// Trace events are appended to the redis key when isTest is true.
//
// Mirrors python: agent_api.py:2123-2175 (the asyncio.create_task body
// inside the Immediately branch).
func (h *AgentHandler) runWebhookDetached(
cv *entity.UserCanvas, payload map[string]any, isTest bool, startTs time.Time,
) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
events, err := h.loader.RunAgentWithWebhook(ctx, cv.UserID, cv.ID, payload)
if err != nil {
common.Warn("webhook detached run start failed",
zap.String("canvas", cv.ID),
zap.Error(err))
if isTest {
appendWebhookTrace(cv.ID, startTs, canvas.RunEvent{Type: "error", Data: mustJSON(map[string]any{"message": err.Error()})})
}
return
}
for ev := range events {
if isTest {
appendWebhookTrace(cv.ID, startTs, ev)
}
}
}
// runWebhookSync drives the canvas in the non-Immediately (streaming)
// mode. Returns the HTTP status + body to send. Mirrors
// agent_api.py:2178-2247 with the python sse() coroutine flattened into
// a synchronous loop.
type webhookSyncResult struct {
status int
body any
}
func (h *AgentHandler) runWebhookSync(
ctx context.Context, cv *entity.UserCanvas, payload map[string]any,
isTest bool, startTs time.Time,
) webhookSyncResult {
status := 200
events, err := h.loader.RunAgentWithWebhook(ctx, cv.UserID, cv.ID, payload)
if err != nil {
if isTest {
appendWebhookTrace(cv.ID, startTs, canvas.RunEvent{Type: "error", Data: mustJSON(map[string]any{"message": err.Error()})})
appendWebhookTrace(cv.ID, startTs, canvas.RunEvent{Type: "finished", Data: mustJSON(map[string]any{"success": false})})
}
return webhookSyncResult{status: http.StatusBadRequest, body: gin.H{
"code": 400,
"message": err.Error(),
"success": false,
}}
}
contents := []string{}
for ev := range events {
if isTest {
appendWebhookTrace(cv.ID, startTs, ev)
}
switch ev.Type {
case "message":
var msg struct {
Content string `json:"content"`
StartToThink bool `json:"start_to_think"`
EndToThink bool `json:"end_to_think"`
}
if json.Unmarshal([]byte(ev.Data), &msg) == nil {
content := msg.Content
if msg.StartToThink {
content = "think"
} else if msg.EndToThink {
content = "/think"
}
if content != "" {
contents = append(contents, content)
}
}
case "message_end":
var end struct {
Status *int `json:"status"`
}
if json.Unmarshal([]byte(ev.Data), &end) == nil && end.Status != nil {
status = *end.Status
}
}
}
final := strings.Join(contents, "")
if isTest {
appendWebhookTrace(cv.ID, startTs, canvas.RunEvent{Type: "finished", Data: mustJSON(map[string]any{"success": true})})
}
return webhookSyncResult{status: status, body: gin.H{
"message": final,
"success": true,
"code": status,
}}
}
// mustJSON marshals v to a JSON object string. Used by trace appenders;
// panics on marshal failure (acceptable because we only marshal
// statically-typed map[string]any values).
func mustJSON(v any) string {
var buf bytes.Buffer
_ = json.NewEncoder(&buf).Encode(v)
return buf.String()
}
// appendWebhookTrace appends a single RunEvent to the per-canvas trace
// key in Redis. Mirrors python's append_webhook_trace at
// agent_api.py:2073-2091.
//
// The trace key is `webhook-trace-<agent_id>-logs` with a 600 s TTL.
// Each event is recorded as {"ts": <float>, "event": <type>, ...}.
// Tests use miniredis to verify the key shape.
func appendWebhookTrace(agentID string, startTs time.Time, ev canvas.RunEvent) {
rdb := rediscli.Get()
if rdb == nil {
return
}
key := fmt.Sprintf("webhook-trace-%s-logs", agentID)
raw, _ := rdb.Get(key)
obj := map[string]any{}
if raw != "" {
_ = json.Unmarshal([]byte(raw), &obj)
}
whs, _ := obj["webhooks"].(map[string]any)
if whs == nil {
whs = map[string]any{}
obj["webhooks"] = whs
}
entryKey := strconv.FormatFloat(float64(startTs.UnixNano())/1e9, 'f', -1, 64)
entry, _ := whs[entryKey].(map[string]any)
if entry == nil {
entry = map[string]any{
"start_ts": float64(startTs.UnixNano()) / 1e9,
"events": []any{},
}
whs[entryKey] = entry
}
events, _ := entry["events"].([]any)
eventRecord := map[string]any{
"ts": float64(time.Now().UnixNano()) / 1e9,
"event": ev.Type,
}
if ev.Data != "" {
eventRecord["data"] = json.RawMessage(ev.Data)
}
if ev.MessageID != "" {
eventRecord["message_id"] = ev.MessageID
}
if ev.TaskID != "" {
eventRecord["task_id"] = ev.TaskID
}
if ev.SessionID != "" {
eventRecord["session_id"] = ev.SessionID
}
entry["events"] = append(events, eventRecord)
encoded, err := json.Marshal(obj)
if err != nil {
common.Warn("webhook trace marshal failed", zap.Error(err))
return
}
rdb.SetObj(key, string(encoded), 600*time.Second)
}