Files
ragflow/internal/service/bot_completion.go
Zhichang Yu dfe2dc346d feat[Go]: port agent attachment download, chatbot + agentbot completion/info endpoints from Python (#16405)
## Summary

Ports five Python agent APIs to Go under the v1 Gin router:

- `GET  /api/v1/agents/attachments/<attachment_id>/download`
- `POST /api/v1/chatbots/<dialog_id>/completions`  (SSE)
- `GET  /api/v1/chatbots/<dialog_id>/info`
- `POST /api/v1/agentbots/<agent_id>/completions` (SSE)
- `GET  /api/v1/agentbots/<agent_id>/inputs`

Mirrors the existing Python wire shape (`{code, message,
data:{answer,reference,...}}` per Python `canvas_service.completion`) so
the iframe SDK and existing JS widgets keep working.

## Behavioural parity with Python

| # | Concern | How it's met |
|---|---------|--------------|
| R0 | Bot routes must not require regular user session | Routes mount
on `apiNoAuth` (router.go:198-202), with `BetaAuthMiddleware` only |
| R3 | Two SSE formats in Go drift | F2: `AgentChatCompletions` and
`AgentbotCompletion` share `service.WriteChatbotRunEvent` |
| R7 | `GetBySessionID` returns `(nil, nil)` on miss | Defensive
nil-check before `session.UserID != tenantID` |
| R8 | Begin component name vs ID | `FindBeginComponentID` resolves name
→ ID first, then `ExtractComponentInputForm(dsl, beginID)` |
| R9 | Defensive PromptConfig parsing | `stringFromMap` helper used for
`prologue` and `tavily_api_key` |
| R10 | `BetaAuthMiddleware` Bearer-prefix pre-filter | Removed —
`GetUserByToken` is called unconditionally, falls back to
`GetUserByBetaAPIToken` |
| F8 | Multi-turn chatbot history | `ChatbotCompletion` reads prior
turns from `session.Message`, appends user turn, calls LLM, persists new
pair via new `API4ConversationDAO.Update` |
| F9 | UUID gate stricter than plan | Removed — only `filepath.Base` +
CR/LF/quote header sanitization remains |
| H2 | Defence-in-depth IDOR | `AgentbotCompletion` calls `loadCanvas`
before delegating to `RunAgent` |
| M2 | SSE error leakage | `WriteChatbotFrame` emits generic `"an
internal error occurred"`; real error logged via `common.Error` |

## Verification

```bash
$ go vet ./...                                     # clean (only pre-existing issues)
$ go build ./...                                   # success
$ go test ./internal/handler/ ./internal/service/ ./internal/agent/dsl/ ./internal/common/ ./internal/dao/
ok  ragflow/internal/handler     0.617s
ok  ragflow/internal/service     1.729s
ok  ragflow/internal/agent/dsl   0.008s
ok  ragflow/internal/common      0.087s
ok  ragflow/internal/dao         0.083s
```

1199 tests pass across 5 packages.

## Known follow-ups (out of scope for this PR)

- **F1**: token-level streaming in `ChatbotCompletion` (currently emits
one frame per turn)
- **F3**: per-route `auth_types` attribute in Go (currently applied via
route group middleware)

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-29 09:45:16 +08:00

398 lines
14 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// bot_completion.go is the SSE envelope writer + ChatbotCompletion
// service path for /api/v1/chatbots/<dialog_id>/completions. The wire
// shape is dictated by the existing python
// `api/db/services/conversation_service.py::async_iframe_completion`
// — JS widgets reading the iframe SDK expect this exact envelope, so
// any change to the frame keys is a wire-contract change.
//
// Frame shape (one JSON object per `data:` line):
//
// {"code":0,"message":"","data":{"answer":"...","reference":{...},
// "audio_binary":null,"id":"...","session_id":"..."}, ...}
//
// The final completion marker is `data: {"code":0,"message":"",
// "data":true}` followed by the OpenAI-style `data: [DONE]` line
// that the existing Go SSE writers emit on the production
// /agents/chat/completions path.
package service
import (
"context"
"encoding/json"
"errors"
"net/http"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"ragflow/internal/agent/canvas"
"ragflow/internal/common"
"ragflow/internal/entity"
modelModule "ragflow/internal/entity/models"
)
// ChatbotSSEFrame is one envelope pushed to the SSE writer by the
// chatbot completion path. Err takes precedence over Data and is
// rendered as a python-style {code:500, message:str(e),
// data:{answer:"**ERROR**..."}} frame.
type ChatbotSSEFrame struct {
Data string `json:"-"`
Reference map[string]any `json:"-"`
SessionID string `json:"-"`
Done bool `json:"-"`
Err error `json:"-"`
}
// WriteChatbotFrame emits one python-style SSE frame and flushes the
// underlying http.ResponseWriter. The frame is `data: <json>\n\n`
// and is byte-equivalent to the python side so the iframe SDK and
// existing JS widgets keep working.
//
// Error frames sanitize the message — internal errors (gorm stack
// frames, SQL details, storage paths) MUST NOT be echoed to the
// client. The caller is expected to log the real error via
// common.Error / zap before publishing the frame; only a generic
// placeholder is rendered here. Mirrors the python
// `api/db/services/conversation_service.py` error frame shape.
func WriteChatbotFrame(w http.ResponseWriter, f ChatbotSSEFrame) error {
var payload map[string]any
if f.Err != nil {
const clientErrMsg = "an internal error occurred"
payload = map[string]any{
"code": 500,
"message": clientErrMsg,
"data": map[string]any{
"answer": clientErrMsg,
"reference": map[string]any{},
},
}
} else {
payload = map[string]any{
"code": 0,
"message": "",
"data": map[string]any{
"answer": f.Data,
"reference": f.Reference,
"audio_binary": nil,
"id": nil,
"session_id": f.SessionID,
},
}
}
b, err := json.Marshal(payload)
if err != nil {
return err
}
if _, err := w.Write([]byte("data: ")); err != nil {
return err
}
if _, err := w.Write(b); err != nil {
return err
}
if _, err := w.Write([]byte("\n\n")); err != nil {
return err
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
return nil
}
// WriteDoneFrame emits the python completion marker
// `data: {"code":0,"message":"","data":true}\n\n` followed by the
// OpenAI-style `data: [DONE]\n\n` terminator. Used by both bot
// completion paths.
func WriteDoneFrame(w http.ResponseWriter) error {
if _, err := w.Write([]byte(`data: {"code":0,"message":"","data":true}` + "\n\n")); err != nil {
return err
}
if _, err := w.Write([]byte("data: [DONE]\n\n")); err != nil {
return err
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
return nil
}
// WriteChatbotRunEvent translates one canvas.RunEvent into the
// unified python-shaped chat-completion envelope (same shape as
// WriteChatbotFrame). This unifies the SSE format across:
//
// - /api/v1/agents/chat/completions (was: writeChatCompletionSSE)
// - /api/v1/agentbots/<id>/completions (was: WriteChatbotFrame per-event)
//
// The "done" event type emits `data: [DONE]\n\n` (no envelope),
// matching the OpenAI-style terminator and the existing
// AgentbotCompletion wire.
//
// For non-done events, ev.Data is placed verbatim into the `answer`
// field — callers pass canvas-runner output that is itself a JSON
// string (e.g. `{"answer":"hi back","reference":[]}`); the iframe
// SDK then JSON.parse()s the `answer` string to extract the inner
// fields. This matches the existing AgentbotCompletion behaviour.
//
// Returns the write error so callers can short-circuit; both nil
// and io.ErrClosedPipe are tolerated because the client may have
// disconnected mid-stream.
func WriteChatbotRunEvent(w http.ResponseWriter, ev canvas.RunEvent) error {
if ev.Type == "done" {
_, err := w.Write([]byte("data: [DONE]\n\n"))
if err != nil {
return err
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
return nil
}
f := ChatbotSSEFrame{
Data: ev.Data,
Reference: map[string]any{},
SessionID: ev.SessionID,
}
return WriteChatbotFrame(w, f)
}
// AgentbotSSEFrame mirrors ChatbotSSEFrame for the agentbot
// completion path. The envelope shape is the same; the only
// difference is that the LLM call goes through the canvas runner
// (AgentService.RunAgent) instead of the legacy dialog async_chat.
type AgentbotSSEFrame = ChatbotSSEFrame
// WriteAgentbotFrame is an alias for WriteChatbotFrame — both bot
// completion paths emit the same python wire shape.
func WriteAgentbotFrame(w http.ResponseWriter, f ChatbotSSEFrame) error {
return WriteChatbotFrame(w, f)
}
// ChatbotCompletion streams an SSE response for
// /api/v1/chatbots/<dialog_id>/completions.
//
// The full LLM session-lifecycle implementation is added below. It
// is a v1 port: it yields a single frame per turn (the Go LLMBundle
// chat call is non-streaming), seeded with the dialog's prologue
// when the request creates a new session.
//
// Authorisation: dialog must exist, belong to the requester's tenant,
// and have status == common.StatusDialogValid.
func (s *BotService) ChatbotCompletion(
ctx context.Context, tenantID, dialogID string, req ChatbotCompletionRequest,
) (<-chan ChatbotSSEFrame, common.ErrorCode, error) {
// 1. Load and authorise the dialog.
//
// ChatSessionDAO.GetDialogByID already filters by status = "1"
// so a returned row is valid; we still nil-check defensively
// before dereferencing for symmetry with the session path.
dialog, err := s.chatDAO.GetDialogByID(dialogID)
if err != nil || dialog == nil ||
dialog.TenantID != tenantID ||
dialog.Status == nil || *dialog.Status != common.StatusDialogValid {
return nil, common.CodeDataError, errors.New("no access to this chatbot")
}
// 2. Resolve or create the session row.
//
// API4ConversationDAO.GetBySessionID returns (nil, nil) on miss
// (not an error) — see internal/dao/api_token.go:146. We MUST
// check the pointer before dereferencing, otherwise the
// session-tenant check below nil-derefs. Plan Risk R7.
//
// UserID vs tenantID (security H3 follow-up):
// `entity.API4Conversation.UserID` is a generic user-id slot
// in the production Python flow
// (api/db/services/conversation_service.py:258 — the python
// async_iframe_completion saves `user_id=kwargs.get("user_id", "")`).
// The Go BotHandler routes pass `user.ID` through the
// "tenantID" parameter (the Go User struct collapses user and
// tenant into one identifier — see project CLAUDE.md), so
// writing `tenantID` here actually stores the requester's
// user-id (== tenant-id) in the python user-id slot. The
// session-tenant check on the read path compares against the
// same value, so write/read stay symmetric. We keep this
// behaviour and add the comment so a future reader doesn't
// "fix" it to a tenant-id lookup and break the symmetry.
var session *entity.API4Conversation
if req.SessionID != "" {
session, err = s.api4ConversationDAO.GetBySessionID(req.SessionID, dialogID)
if err != nil {
return nil, common.CodeServerError, err
}
if session == nil || session.UserID != tenantID {
return nil, common.CodeDataError, errors.New("session not found")
}
} else {
// Seed a new session. The Message column is json.RawMessage;
// pre-serialise the prologue turn as a JSON array of
// {role,content,created_at} dicts — same shape the python
// conversation_service.py:253-272 writes. Plan Risk R4.
prologue := stringFromMap(dialog.PromptConfig, "prologue")
seedMsg, _ := json.Marshal([]map[string]any{
{
"role": "assistant",
"content": prologue,
"created_at": time.Now().Unix(),
},
})
session = &entity.API4Conversation{
ID: uuid.NewString(),
DialogID: dialogID,
UserID: tenantID,
Message: seedMsg,
}
if err := s.api4ConversationDAO.Create(session); err != nil {
return nil, common.CodeServerError, err
}
}
// 3. Resolve the chat LLM via ModelProviderService. The python
// async_iframe_completion resolves the same way through
// LLMBundle(tenant_id, dialog.llm_id); the Go equivalent is
// GetChatModelConfig → NewChatModel → driver.ChatWithMessages.
//
// If llmService is unwired (test boot path) or the dialog has
// no LLM configured, we surface a sanitized CodeDataError
// rather than echoing the bare error string into the SSE
// envelope — see WriteChatbotFrame's sanitization contract.
if s.llmService == nil {
return nil, common.CodeServerError, errors.New("bot: llm service not wired")
}
if dialog.LLMID == "" {
return nil, common.CodeDataError, errors.New("no LLM configured for this chatbot")
}
modelProvider := NewModelProviderService()
driver, modelName, apiConfig, _, err := modelProvider.GetChatModelConfig(tenantID, dialog.LLMID)
if err != nil {
return nil, common.CodeDataError, errors.New("no LLM configured for this chatbot")
}
chatModel := modelModule.NewChatModel(driver, &modelName, apiConfig)
// 4. Build the prompt from prior conversation history plus the
// new user turn. Without this, a resumed session_id would
// authorise reuse but the LLM call would still be stateless
// turn-to-turn — a Python parity regression for any multi-turn
// chatbot client. The Message column on api_4_conversation is a
// json.RawMessage array of {role, content, created_at} dicts,
// matching the python conversation_service.py:253-272 shape.
messages := historyToMessages(session.Message)
messages = append(messages, modelModule.Message{Role: "user", Content: req.Question})
// 5. Yield frames on a channel.
out := make(chan ChatbotSSEFrame, 4)
go func() {
defer close(out)
resp, callErr := chatModel.ModelDriver.ChatWithMessages(
modelName, messages, chatModel.APIConfig, &modelModule.ChatConfig{},
)
if callErr != nil {
// Log the real error with structured context so
// ops can debug, but do NOT echo the raw
// err.Error() to the client (security M2:
// internal gorm/SQL/file-path leaks).
common.Error("bot: ChatbotCompletion LLM call failed",
callErr,
zap.String("dialog_id", dialogID),
zap.String("session_id", session.ID),
zap.String("llm_id", dialog.LLMID),
)
out <- ChatbotSSEFrame{
Err: errors.New("an internal error occurred"),
SessionID: session.ID,
}
out <- ChatbotSSEFrame{Done: true}
return
}
answer := ""
if resp != nil && resp.Answer != nil {
answer = *resp.Answer
}
// Persist the new turn pair (user + assistant) back to
// api_4_conversation so the NEXT call to ChatbotCompletion
// with the same session_id sees this turn in messages.
// Update errors are logged but do NOT fail the SSE stream
// — the answer has already been produced. The next call
// will rebuild from the prior (pre-this-turn) snapshot,
// losing at most the latest exchange; acceptable for v1.
newTurns := append(historyFromMessages(messages),
map[string]any{"role": "assistant", "content": answer, "created_at": time.Now().Unix()},
)
if updated, mErr := json.Marshal(newTurns); mErr == nil {
session.Message = updated
if uErr := s.api4ConversationDAO.Update(session); uErr != nil {
common.Error("bot: ChatbotCompletion session update failed",
uErr,
zap.String("dialog_id", dialogID),
zap.String("session_id", session.ID),
)
}
}
out <- ChatbotSSEFrame{
Data: answer,
Reference: map[string]any{},
SessionID: session.ID,
}
out <- ChatbotSSEFrame{Done: true}
}()
return out, common.CodeSuccess, nil
}
// historyToMessages reads the session.Message JSON array of
// {role, content, ...} dicts and projects it onto modelModule.Message
// for the LLM driver. Tolerates an empty / malformed Message column
// by returning an empty slice — the caller appends the new user turn
// so the LLM still receives the current prompt.
func historyToMessages(raw json.RawMessage) []modelModule.Message {
if len(raw) == 0 {
return nil
}
var turns []map[string]any
if err := json.Unmarshal(raw, &turns); err != nil {
return nil
}
out := make([]modelModule.Message, 0, len(turns))
for _, t := range turns {
role, _ := t["role"].(string)
content, _ := t["content"].(string)
if role == "" || content == "" {
continue
}
out = append(out, modelModule.Message{Role: role, Content: content})
}
return out
}
// historyFromMessages is the inverse projection — used to write the
// updated turn list back to the api_4_conversation.Message column.
func historyFromMessages(msgs []modelModule.Message) []map[string]any {
out := make([]map[string]any, 0, len(msgs))
now := time.Now().Unix()
for i, m := range msgs {
out = append(out, map[string]any{
"role": m.Role,
"content": m.Content,
"created_at": now + int64(i), // preserve order, monotonic
})
}
return out
}