Files
ragflow/cmd/server_main.go

497 lines
19 KiB
Go
Raw Permalink Normal View History

feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952) Ports the agent canvas subsystem from Python to Go. ## What's included ### Canvas Engine (Phase 0/1) - State engine, scheduler, variable resolver, Redis checkpoint store, cancel protocol - **209 tests** across canvas / component / io packages ### 22 Components (P0–P4) | Tier | Components | |---|---| | P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin, Message, Invoke | | P1 T3 | VariableAggregator, VariableAssigner, StringTransform, ListOperations, DataOperations | | P2 T3 | Iteration, IterationItem, Loop, LoopItem | | P3 T3 | UserFillUp, Fillup | | P4 T5 | Browser, ExcelProcessor, DocsGenerator | ### DSL v2 Schema (Phase 2.5) - Typed v2 in-memory model with v1-to-v2 auto-detect converter - v1 legacy field stripping per plan §2.11.7 ### HTTP Endpoints & Bug Fixes (Plans PR1–PR3) - **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)` pattern - **CreateAgent validation**: title/DSL required, duplicate check, 103 envelope - **13 new endpoints**: templates, prompts, tags, sessions CRUD, chat/completions (SSE + non-stream stubs), rerun, test_db_connection, logs, webhook/logs - **756 Go unit tests** (745 → 756, +18) - **17 → 0 Python integration test failures** (test_agents.py + test_session_management/) ### Tools 21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory stubs ### Infrastructure OTel observability, NATS message queue, DeepDoc gRPC client, SSRF guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
//go:build ignore
Go: add ingestion server (#15094) ### What problem does this PR solve? 1. Go ingestion server will connected with admin server with gRPC stream 2. Go ingestion server will be responsible for ingestion tasks ``` RAGFlow(admin)> list ingestors; +-----------------+-----------+----------------------------------+---------------------------+----------+------------+--------------+--------+------------+---------------+ | address | cpu_usage | id | last_heartbeat | name | process_id | rss_usage | status | task_count | vms_usage | +-----------------+-----------+----------------------------------+---------------------------+----------+------------+--------------+--------+------------+---------------+ | 127.0.0.1:58564 | 0 | bdd1870eea2646e0aacb8a2cd3307aa2 | 2026-05-24T18:16:17+08:00 | ingestor | 680152 | 212.72265625 | active | 0 | 2589.12109375 | +-----------------+-----------+----------------------------------+---------------------------+----------+------------+--------------+--------+------------+---------------+ RAGFlow(admin)> start ingestion 'abc'; +----------------------------------+ | task_id | +----------------------------------+ | e714777639ca4760ab427b5f211e81ad | +----------------------------------+ RAGFlow(admin)> stop ingestion 'f7bd39d0a724457eb5fdce6d81699776'; +----------------------------------+ | task_id | +----------------------------------+ | f7bd39d0a724457eb5fdce6d81699776 | +----------------------------------+ RAGFlow(admin)> list tasks; +-----+----------------------------------+-------+------+----------------------------------+---------------------------+------------+------------+ | ETA | assign_to | error | from | id | last_update | start_time | status | +-----+----------------------------------+-------+------+----------------------------------+---------------------------+------------+------------+ | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | eae6431da72a40e796cff3a03008091b | 2026-05-24T19:46:03+08:00 | | COMPLETED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | 6cccdd174bd049ecb05a774bbb47593f | 2026-05-24T19:46:03+08:00 | | COMPLETED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | ef360d777e57485799adb96b30f2b4b8 | 2026-05-24T19:46:03+08:00 | | CANCELED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | bcc5c5448cb64de48b6b6171c36fb790 | 2026-05-24T19:46:03+08:00 | | CANCELED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | bfc25384c43a443294fe2da979a38ac2 | 2026-05-24T19:46:03+08:00 | | DISPATCHED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | 84960537b85d413b8990a9efd5952d67 | 2026-05-24T19:46:04+08:00 | | DISPATCHED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | 3d223c1b51e24b36861a3bfb2f1d58d4 | 2026-05-24T19:46:03+08:00 | | CANCELED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | e433b0e356b846c89c301621a3c54494 | 2026-05-24T19:46:03+08:00 | | COMPLETED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | 7c93a3880f074ebd8eca14e6b51bb7ef | 2026-05-24T19:46:03+08:00 | | COMPLETED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | df2e4ef51aaf4390bff9a23f2692486e | 2026-05-24T19:46:04+08:00 | | DISPATCHED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | 7377c53010194ef7a83aa206698d66ff | 2026-05-24T19:46:05+08:00 | | DISPATCHED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | df64d1a1f9d348e3a2f174c4d7d69e73 | 2026-05-24T19:46:05+08:00 | | DISPATCHED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | b59834512e2847e1bdf13ace04b8a456 | 2026-05-24T19:46:06+08:00 | | DISPATCHED | | 0 | 17937da188b84f23a5c10bb87588944b | | CLI | 0064bb0ab69344028d1ecfda053826f4 | 2026-05-24T19:46:03+08:00 | | QUEUED | +-----+----------------------------------+-------+------+----------------------------------+---------------------------+------------+------------+ ``` ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai <haijin.chn@gmail.com>
2026-05-25 14:00:08 +08:00
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package main
import (
"context"
"errors"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"ragflow/internal/common"
"ragflow/internal/engine/redis"
"ragflow/internal/server"
"ragflow/internal/server/local"
"ragflow/internal/storage"
"ragflow/internal/utility"
"strings"
"syscall"
"time"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"ragflow/internal/agent/audio"
"ragflow/internal/agent/canvas"
feat(agent): align Go agent behavior with Python (except retrieval component) (#16225) ## Summary Aligns the **Go agent runtime/canvas/components/tools** behavior with the **Python `agent/` implementation** so the same stored canvas DSL produces the same execution result on either side. Every component, tool, and runtime primitive in `internal/agent/` is now driven by the same semantics as its Python counterpart — variable resolution, template substitution, control flow, error reporting, retry/cancel, and stream event shapes. The **retrieval component is the one explicit exception** in this PR. It is being reworked in a separate change and is excluded from this alignment pass; the wrapper slot (`universe_a_wrappers.go → newRetrievalComponent`) is preserved. ## Scope of alignment ### Components (all aligned with `agent/component/`) `Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs, MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure, JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent` (react + tool artifact capture + `Reset()` interface-assert) · `Switch` (12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke` · `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`) · `UserFillUp` (Python-equivalent interrupt/resume via eino `compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` · `ListOperations` · `StringTransform` · `VariableAggregator` · `VariableAssigner` · `Browser` (full stagehand runtime parity) · `DocsGenerator` · `ExcelProcessor`. ### Tools (all aligned with `agent/tools/`) `Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter` (streamable-HTTP) · `CodeExec` (sandbox bridge with `code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv` · `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` · `Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG` · `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` — uniform `eino tool.InvokableTool` interface, SSRF protection, shared HTTP client. ### Canvas execution engine (`internal/agent/canvas/`) Aligned with Python's `agent/canvas.py`: - **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas, per-component timeout resolver (4-level: per-class env → per-class table → uniform env → 600s fallback), `legacyNoOpNames`. - **Loop subgraph** (`loop_subgraph.go`): Python-equivalent `AddLoopNode` macro expansion + condition translation. - **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing via `compose.NewGraphMultiBranch` — same branch selection semantics as Python. - **Parallel subgraph** (`parallel_subgraph.go`): matches Python's parallel fan-out contract. - **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` / `IsInterruptError` / `ExtractInterruptContexts` — replaces the deprecated Python sentinel chain with eino's native interrupt API, preserving the same external behavior. - **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore` Get/Set/Delete, with business metadata (status / canvas_id / parent_run_id) on a parallel Redis Hash. - **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed / MarkCancelled / AttachCheckpoint — same lifecycle as the Python run record. - **Cancel** (`cancel.go`): Redis pub/sub watch. - **Stream** (`stream.go`): SSE channel with `messages` / `waiting` / `errors` / `done` events, same shape as Python's `agent.canvas.RunEvent` payload. ### DSL bridge (`internal/agent/dsl/`) - `normalize.go`: v1↔v2 collapsed into a single wire format — Python and Go consume the same stored JSON. - `reset.go`: per-run state reset matches Python's `Canvas.reset()` semantics. - Testdata mirrors Python's `agent_msg.json` / `all.json` / etc. ### Runtime (`internal/agent/runtime/`) - `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`: same `{{cpn_id@param}}` resolution model. - `ResolveTemplate` (regex fast path + gonja fallback) — Python Jinja-style semantics. - `selector.go`, `metrics.go`, `component.go`: shared runtime contracts. ## Out of scope (intentionally) - **`Retrieval` component logic** — wrapped only; full parity lands in a follow-up PR. - **Frontend** — only minor dsl-bridge / canvas UX fixes ride along. - **CLI / admin / model registry** — orthogonal to agent behavior. ## How alignment is verified `internal/service/agent_run_e2e_test.go` exercises the **full production chain** against real Python-shaped DSL fixtures: ``` loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL → canvas.Compile → cc.Workflow.Invoke → answer extraction ``` using in-memory SQLite + miniredis (no Docker). Covers: - `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}` resolution - `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle (Python-equivalent) - `TestRunAgent_RealCanvas_CompileFails` — unknown component name → sanitized error (Python-equivalent) - `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref (Python-equivalent) - `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` — Start→AttachCheckpoint→MarkSucceeded lifecycle `internal/handler/agent_test.go` — SSE streaming parity (`Content-Type: text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`, OpenAI-compatible non-stream `choices`). `internal/agent/canvas/fixture_compile_test.go` + per-component tests pin the Python-equivalent outputs. ``` go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/ ``` ## Design reference `docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked 2026-06-17) — module layout, per-component / per-tool inventory, corner-case catalogue, and the actionable backlog (Section 14, including the retrieval alignment follow-up). --------- Co-authored-by: Claude <noreply@anthropic.com>
2026-06-22 11:58:29 +08:00
_ "ragflow/internal/agent/component" // blank import: registers every Component factory (Begin / Agent / LLM / Message / Retrieval / ...) into the shared runtime at package init
feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952) Ports the agent canvas subsystem from Python to Go. ## What's included ### Canvas Engine (Phase 0/1) - State engine, scheduler, variable resolver, Redis checkpoint store, cancel protocol - **209 tests** across canvas / component / io packages ### 22 Components (P0–P4) | Tier | Components | |---|---| | P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin, Message, Invoke | | P1 T3 | VariableAggregator, VariableAssigner, StringTransform, ListOperations, DataOperations | | P2 T3 | Iteration, IterationItem, Loop, LoopItem | | P3 T3 | UserFillUp, Fillup | | P4 T5 | Browser, ExcelProcessor, DocsGenerator | ### DSL v2 Schema (Phase 2.5) - Typed v2 in-memory model with v1-to-v2 auto-detect converter - v1 legacy field stripping per plan §2.11.7 ### HTTP Endpoints & Bug Fixes (Plans PR1–PR3) - **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)` pattern - **CreateAgent validation**: title/DSL required, duplicate check, 103 envelope - **13 new endpoints**: templates, prompts, tags, sessions CRUD, chat/completions (SSE + non-stream stubs), rerun, test_db_connection, logs, webhook/logs - **756 Go unit tests** (745 → 756, +18) - **17 → 0 Python integration test failures** (test_agents.py + test_session_management/) ### Tools 21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory stubs ### Infrastructure OTel observability, NATS message queue, DeepDoc gRPC client, SSRF guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
"ragflow/internal/agent/runtime"
agenttool "ragflow/internal/agent/tool"
"ragflow/internal/dao"
"ragflow/internal/engine"
"ragflow/internal/handler"
"ragflow/internal/router"
"ragflow/internal/service"
Go: use NATS as the message queue (#15327) ### What problem does this PR solve? ``` RAGFlow(admin)> mq publish 'msg2'; SUCCESS RAGFlow(admin)> mq publish 'msg3'; SUCCESS RAGFlow(admin)> mq list; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | | msg3 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull 2; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull noack; +---------+---------------+ | message | subject | +---------+---------------+ | abc | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq show +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | ack_pending_count | consumer_count | memory | message_count | pending_count | redelivered_count | waiting_count | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | 2 | 1 | 0 | 2 | 0 | 1 | 0 | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ RAGFlow(admin)> list ingestors; +--------------+-------------------------------------------+--------+ | host | name | status | +--------------+-------------------------------------------+--------+ | 192.168.1.38 | ingestor-8f0e4bd5650a4ac58b0151969fbf6935 | alive | +--------------+-------------------------------------------+--------+ RAGFlow(admin)> list ingestion tasks; +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | document_id | id | status | step | user | user_id | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | ffe64fae423411f1a2d938a74640adcc | 90d3d0f6528941c1ac8eb0360effccc4 | COMPLETED | 5 | aaa@aaa.com | 2ba4881420fa11f19e9c38a74640adcc | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ RAGFlow(admin)> remove ingestion tasks '90d3d0f6528941c1ac8eb0360effccc4'; +---------+----------------------------------+ | delete | task_id | +---------+----------------------------------+ | success | 90d3d0f6528941c1ac8eb0360effccc4 | +---------+----------------------------------+ RAGFlow(admin)> stop ingestion tasks 'e89e20d9a25848a1b79bd9345ddbfe1d'; +----------+----------------------------------+ | status | task_id | +----------+----------------------------------+ | STOPPING | e89e20d9a25848a1b79bd9345ddbfe1d | +----------+----------------------------------+ # Publish a message RAGFlow(admin)> mq publish 'cdd'; SUCCESS # List current tasks in the message queue RAGFlow(admin)> mq list +----------------------------------+---------------+ | message | subject | +----------------------------------+---------------+ | 7ce392a3c1624cd2be4b5276e8825059 | tasks.RAGFLOW | +----------------------------------+---------------+ # Consume a task from the message queue RAGFlow(admin)> mq pull +------+-----+----------------+ | ack | id | type | +------+-----+----------------+ | true | cdd | ingestion_test | +------+-----+----------------+ # User mode # List ingestion tasks, followed by dataset id RAGFlow(user)> list ingestion tasks from '0abe79f9423311f1ad8d38a74640adcc'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ RAGFlow(user)> list ingestion tasks; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-06-02T19:02:31+08:00 | 1780398151417 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | e89e20d9a25848a1b79bd9345ddbfe1d | | COMPLETED | 2026-06-02T19:02:52+08:00 | 1780398172208 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Create an ingestion task # First argument is document id, second argument is dataset id RAGFlow(user)> start ingestion 'ffe64fae423411f1a2d938a74640adcc' from '0abe79f9423311f1ad8d38a74640adcc'; +----------------------------------+-------------------------------------------+ | document_id | result | +----------------------------------+-------------------------------------------+ | ffe64fae423411f1a2d938a74640adcc | task_id: 8d758cd14a8b4ba8ab505003fb52017d | +----------------------------------+-------------------------------------------+ # Pause an ingestion task, first argument is ingestion id RAGFlow(user)> stop ingestion '8d758cd14a8b4ba8ab505003fb52017d'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Delete an ingestion task RAGFlow(api/default)> remove ingestion tasks 'f366450a27d54677aec1c7090add30f0'; +---------+----------------------------------+ | remove | task_id | +---------+----------------------------------+ | success | f366450a27d54677aec1c7090add30f0 | +---------+----------------------------------+ ``` ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai <haijin.chn@gmail.com>
2026-06-12 14:56:44 +08:00
"ragflow/internal/service/chunk"
"ragflow/internal/service/nlp"
"ragflow/internal/tokenizer"
)
func printHelp() {
fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS]\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, "RAGFlow Server - Open-source RAG engine based on deep document understanding\n\n")
fmt.Fprintf(os.Stderr, "Options:\n")
fmt.Fprintf(os.Stderr, " -p, --port int\t\tServer port (overrides config file)\n")
fmt.Fprintf(os.Stderr, " -v, --version \tPrint version information and exit\n")
fmt.Fprintf(os.Stderr, " --debug \tEnable debug-level logging\n")
fmt.Fprintf(os.Stderr, " -h, --help \tShow this help message and exit\n")
fmt.Fprintf(os.Stderr, "\nExamples:\n")
fmt.Fprintf(os.Stderr, " %s \t\t# Start server with config file port\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s -p 8080 \t\t# Start server on port 8080\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s --port 8080 \t# Start server on port 8080\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s --version \t# Show version and exit\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s --debug \t# Start server with debug logging\n", os.Args[0])
}
func main() {
// Parse command line flags
var portFlag int
flag.IntVar(&portFlag, "port", 0, "Server port (overrides config file)")
flag.IntVar(&portFlag, "p", 0, "Server port (shorthand, overrides config file)")
var debugFlag bool
flag.BoolVar(&debugFlag, "debug", false, "Enable debug-level logging")
var versionFlag bool
flag.BoolVar(&versionFlag, "version", false, "Print version information and exit")
// Custom help message
flag.Usage = printHelp
flag.Parse()
// Handle --version flag: print version and exit immediately
if versionFlag {
fmt.Printf("RAGFlow version: %s\n", utility.GetRAGFlowVersion())
return
}
// Temporarily default to debug while investigating the Go chat/SSE path.
if err := common.Init("debug", common.FileOutput{Path: "server_main.log"}); err != nil {
panic(fmt.Sprintf("Failed to initialize logger: %v", err))
}
// Initialize configuration
if err := server.Init(""); err != nil {
common.Fatal("Failed to initialize config", zap.Error(err))
}
// Override port with command line argument if provided
config := server.GetConfig()
if portFlag > 0 {
config.Server.Port = portFlag
common.Info("Port overridden by command line argument", zap.Int("port", portFlag))
}
if config.Server.Port == 0 {
common.Fatal("Server port is not configured. Please specify via --port flag or config file.")
}
// Reinitialize logger with configured level if different
level := config.Log.Level
if level == "" {
level = "debug"
}
if debugFlag {
level = "debug"
}
fileOut := common.FileOutput{
Path: "server_main.log",
MaxSize: config.Log.MaxSize,
MaxBackups: config.Log.MaxBackups,
MaxAge: config.Log.MaxAge,
Compress: common.ResolveCompress(config.Log.Compress),
}
if config.Log.Path != "" {
fileOut.Path = config.Log.Path
}
if err := common.Init(level, fileOut); err != nil {
common.Error("Failed to reinitialize logger", err)
}
server.SetLogger(common.Logger)
if config.Log.Level == "" {
config.Log.Level = common.GetLevel()
}
common.Info("Server mode", zap.String("mode", config.Server.Mode))
// Print all configuration settings
server.PrintAll()
// Initialize database
if err := dao.InitDB(); err != nil {
common.Fatal("Failed to initialize database", zap.Error(err))
}
// Initialize doc engine
if err := engine.Init(&config.DocEngine); err != nil {
common.Fatal("Failed to initialize doc engine", zap.Error(err))
}
defer engine.Close()
// Initialize Redis cache
if err := redis.Init(&config.Redis); err != nil {
common.Fatal("Failed to initialize Redis", zap.Error(err))
}
defer redis.Close()
if err := storage.InitStorageFactory(); err != nil {
common.Fatal("Failed to initialize storage factory", zap.Error(err))
}
Go: use NATS as the message queue (#15327) ### What problem does this PR solve? ``` RAGFlow(admin)> mq publish 'msg2'; SUCCESS RAGFlow(admin)> mq publish 'msg3'; SUCCESS RAGFlow(admin)> mq list; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | | msg3 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull 2; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull noack; +---------+---------------+ | message | subject | +---------+---------------+ | abc | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq show +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | ack_pending_count | consumer_count | memory | message_count | pending_count | redelivered_count | waiting_count | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | 2 | 1 | 0 | 2 | 0 | 1 | 0 | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ RAGFlow(admin)> list ingestors; +--------------+-------------------------------------------+--------+ | host | name | status | +--------------+-------------------------------------------+--------+ | 192.168.1.38 | ingestor-8f0e4bd5650a4ac58b0151969fbf6935 | alive | +--------------+-------------------------------------------+--------+ RAGFlow(admin)> list ingestion tasks; +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | document_id | id | status | step | user | user_id | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | ffe64fae423411f1a2d938a74640adcc | 90d3d0f6528941c1ac8eb0360effccc4 | COMPLETED | 5 | aaa@aaa.com | 2ba4881420fa11f19e9c38a74640adcc | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ RAGFlow(admin)> remove ingestion tasks '90d3d0f6528941c1ac8eb0360effccc4'; +---------+----------------------------------+ | delete | task_id | +---------+----------------------------------+ | success | 90d3d0f6528941c1ac8eb0360effccc4 | +---------+----------------------------------+ RAGFlow(admin)> stop ingestion tasks 'e89e20d9a25848a1b79bd9345ddbfe1d'; +----------+----------------------------------+ | status | task_id | +----------+----------------------------------+ | STOPPING | e89e20d9a25848a1b79bd9345ddbfe1d | +----------+----------------------------------+ # Publish a message RAGFlow(admin)> mq publish 'cdd'; SUCCESS # List current tasks in the message queue RAGFlow(admin)> mq list +----------------------------------+---------------+ | message | subject | +----------------------------------+---------------+ | 7ce392a3c1624cd2be4b5276e8825059 | tasks.RAGFLOW | +----------------------------------+---------------+ # Consume a task from the message queue RAGFlow(admin)> mq pull +------+-----+----------------+ | ack | id | type | +------+-----+----------------+ | true | cdd | ingestion_test | +------+-----+----------------+ # User mode # List ingestion tasks, followed by dataset id RAGFlow(user)> list ingestion tasks from '0abe79f9423311f1ad8d38a74640adcc'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ RAGFlow(user)> list ingestion tasks; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-06-02T19:02:31+08:00 | 1780398151417 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | e89e20d9a25848a1b79bd9345ddbfe1d | | COMPLETED | 2026-06-02T19:02:52+08:00 | 1780398172208 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Create an ingestion task # First argument is document id, second argument is dataset id RAGFlow(user)> start ingestion 'ffe64fae423411f1a2d938a74640adcc' from '0abe79f9423311f1ad8d38a74640adcc'; +----------------------------------+-------------------------------------------+ | document_id | result | +----------------------------------+-------------------------------------------+ | ffe64fae423411f1a2d938a74640adcc | task_id: 8d758cd14a8b4ba8ab505003fb52017d | +----------------------------------+-------------------------------------------+ # Pause an ingestion task, first argument is ingestion id RAGFlow(user)> stop ingestion '8d758cd14a8b4ba8ab505003fb52017d'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Delete an ingestion task RAGFlow(api/default)> remove ingestion tasks 'f366450a27d54677aec1c7090add30f0'; +---------+----------------------------------+ | remove | task_id | +---------+----------------------------------+ | success | f366450a27d54677aec1c7090add30f0 | +---------+----------------------------------+ ``` ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai <haijin.chn@gmail.com>
2026-06-12 14:56:44 +08:00
if err := engine.InitMessageQueueEngine(config.TaskExecutor.MessageQueueType); err != nil {
common.Error("Failed to initialize message queue engine", err)
}
// Initialize server variables (runtime variables that can change during operation)
// This must be done after Cache is initialized
if err := server.InitVariables(redis.Get()); err != nil {
common.Warn("Failed to initialize server variables from Redis, using defaults", zap.String("error", err.Error()))
}
// Initialize admin status (default: unavailable=1)
local.InitAdminStatus(1, "admin server not connected")
// Initialize tokenizer (rag_analyzer)
feat: migrate DELETE /api/v1/datasets/:dataset_id/documents to Go (#15577) ## Summary Migrate the batch document deletion endpoint from Python to Go. Two modes supported: explicit `ids` list and `delete_all`. ## Changes | File | Change | |------|--------| | `internal/dao/file2document.go` | Add `GetByDocumentID`, `DeleteByDocumentID` | | `internal/dao/file2document_test.go` | 5 new tests | | `internal/dao/kb_test.go` | 2 new tests (`DecreaseDocumentNum`) | | `internal/service/document.go` | Add `deleteDocumentFull` + `DeleteDocuments`, refactor `DeleteDocument` | | `internal/service/document_test.go` | 10 new tests | | `internal/handler/document.go` | Add `documentServiceIface` + `DeleteDocuments` handler | | `internal/handler/document_test.go` | 7 new tests | | `internal/router/router.go` | Register `DELETE /:dataset_id/documents` | | `cmd/server_main.go` | Support `RAGFLOW_DICT_PATH` env var | | `internal/binding/rag_analyzer.go` | Use `-lpcre2-8` dynamic linking | | `internal/dao/database.go` | Skip Error 1091/1138 during migration | | `internal/service/llm.go` | Fix vet warning | ## Per-document cleanup - Delete tasks from DB - Hard-delete document + decrement KB counters - Delete chunks from document engine (nil-guarded) - Delete metadata from document engine (nil-guarded) - Remove file2document mapping + file record + storage blob ## Test Results **24 unit tests all passing** (7 DAO + 10 service + 7 handler) using SQLite :memory: + gin.TestMode. See [test report](docs/test_report_delete_documents.md) for manual integration test results. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:55:53 +08:00
dictPath := os.Getenv("RAGFLOW_DICT_PATH")
if dictPath == "" {
dictPath = "/usr/share/infinity/resource"
}
tokenizerCfg := &tokenizer.PoolConfig{
feat: migrate DELETE /api/v1/datasets/:dataset_id/documents to Go (#15577) ## Summary Migrate the batch document deletion endpoint from Python to Go. Two modes supported: explicit `ids` list and `delete_all`. ## Changes | File | Change | |------|--------| | `internal/dao/file2document.go` | Add `GetByDocumentID`, `DeleteByDocumentID` | | `internal/dao/file2document_test.go` | 5 new tests | | `internal/dao/kb_test.go` | 2 new tests (`DecreaseDocumentNum`) | | `internal/service/document.go` | Add `deleteDocumentFull` + `DeleteDocuments`, refactor `DeleteDocument` | | `internal/service/document_test.go` | 10 new tests | | `internal/handler/document.go` | Add `documentServiceIface` + `DeleteDocuments` handler | | `internal/handler/document_test.go` | 7 new tests | | `internal/router/router.go` | Register `DELETE /:dataset_id/documents` | | `cmd/server_main.go` | Support `RAGFLOW_DICT_PATH` env var | | `internal/binding/rag_analyzer.go` | Use `-lpcre2-8` dynamic linking | | `internal/dao/database.go` | Skip Error 1091/1138 during migration | | `internal/service/llm.go` | Fix vet warning | ## Per-document cleanup - Delete tasks from DB - Hard-delete document + decrement KB counters - Delete chunks from document engine (nil-guarded) - Delete metadata from document engine (nil-guarded) - Remove file2document mapping + file record + storage blob ## Test Results **24 unit tests all passing** (7 DAO + 10 service + 7 handler) using SQLite :memory: + gin.TestMode. See [test report](docs/test_report_delete_documents.md) for manual integration test results. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:55:53 +08:00
DictPath: dictPath,
}
if err := tokenizer.Init(tokenizerCfg); err != nil {
common.Fatal("Failed to initialize tokenizer", zap.Error(err))
}
defer tokenizer.Close()
// Initialize global QueryBuilder using tokenizer's DictPath
// This ensures the Synonym uses the same wordnet directory as tokenizer
if err := nlp.InitQueryBuilderFromTokenizer(tokenizerCfg.DictPath); err != nil {
common.Fatal("Failed to initialize query builder", zap.Error(err))
}
startServer(config)
common.Info("Server exited")
}
func startServer(config *server.Config) {
// Set Gin mode
if config.Server.Mode == "release" {
gin.SetMode(gin.ReleaseMode)
} else {
gin.SetMode(gin.DebugMode)
}
// Initialize service layer
userService := service.NewUserService()
documentService := service.NewDocumentService()
datasetsService := service.NewDatasetService()
knowledgebaseService := service.NewKnowledgebaseService()
metadataService := service.NewMetadataService()
chunkService := chunk.NewChunkService()
llmService := service.NewLLMService()
tenantService := service.NewTenantService()
chatService := service.NewChatService()
chatChannelService := service.NewChatChannelService()
langfuseService := service.NewLangfuseService()
chatSessionService := service.NewChatSessionService()
openaiChatService := service.NewOpenAIChatService()
systemService := service.NewSystemService()
connectorService := service.NewConnectorService()
searchService := service.NewSearchService()
fileService := service.NewFileService()
memoryService := service.NewMemoryService()
mcpService := service.NewMCPService()
modelProviderService := service.NewModelProviderService()
// Initialize doc engine for skill search
docEngine := engine.Get()
documentDAO := dao.NewDocumentDAO()
agenttool.SetRetrievalService(agenttool.NewNLPRetrievalAdapterFromDeps(docEngine, documentDAO))
common.Info("agent: retrieval service adapter installed")
// Initialize handler layer
authHandler := handler.NewAuthHandler()
userHandler := handler.NewUserHandler(userService)
tenantHandler := handler.NewTenantHandler(tenantService, userService, knowledgebaseService)
documentHandler := handler.NewDocumentHandler(documentService, datasetsService)
datasetsHandler := handler.NewDatasetsHandler(datasetsService, metadataService)
systemHandler := handler.NewSystemHandler(systemService)
knowledgebaseHandler := handler.NewKnowledgebaseHandler(knowledgebaseService, userService, documentService)
chunkHandler := handler.NewChunkHandler(chunkService, userService)
llmHandler := handler.NewLLMHandler(llmService, userService)
chatHandler := handler.NewChatHandler(chatService, userService)
chatChannelHandler := handler.NewChatChannelHandler(chatChannelService)
langfuseHandler := handler.NewLangfuseHandler(langfuseService)
chatSessionHandler := handler.NewChatSessionHandler(chatSessionService, userService)
openaiChatHandler := handler.NewOpenAIChatHandler(openaiChatService)
connectorHandler := handler.NewConnectorHandler(connectorService, userService)
searchHandler := handler.NewSearchHandler(searchService, userService)
fileHandler := handler.NewFileHandler(fileService, userService)
memoryHandler := handler.NewMemoryHandler(memoryService)
mcpHandler := handler.NewMCPHandler(mcpService)
skillSearchHandler := handler.NewSkillSearchHandler(docEngine)
providerHandler := handler.NewProviderHandler(userService, modelProviderService)
// Install the agent service's Redis-backed run infrastructure
// (CheckPointStore / StateSerializer / RunTracker). When Redis
// is unreachable (degraded boot, stand-alone mode, no-redis CI)
// the constructors return errors and we fall through to the
// in-memory / no-tracking path: the agent service treats nil
// options as the in-memory test path, so graceful degradation
// is a 1-line if-not-nil pass-through — no separate "boot" mode
// required.
agentOpts := buildAgentRunOptions()
feat[Go]: port agent attachment download, chatbot + agentbot completion/info endpoints from Python (#16405) ## Summary Ports five Python agent APIs to Go under the v1 Gin router: - `GET /api/v1/agents/attachments/<attachment_id>/download` - `POST /api/v1/chatbots/<dialog_id>/completions` (SSE) - `GET /api/v1/chatbots/<dialog_id>/info` - `POST /api/v1/agentbots/<agent_id>/completions` (SSE) - `GET /api/v1/agentbots/<agent_id>/inputs` Mirrors the existing Python wire shape (`{code, message, data:{answer,reference,...}}` per Python `canvas_service.completion`) so the iframe SDK and existing JS widgets keep working. ## Behavioural parity with Python | # | Concern | How it's met | |---|---------|--------------| | R0 | Bot routes must not require regular user session | Routes mount on `apiNoAuth` (router.go:198-202), with `BetaAuthMiddleware` only | | R3 | Two SSE formats in Go drift | F2: `AgentChatCompletions` and `AgentbotCompletion` share `service.WriteChatbotRunEvent` | | R7 | `GetBySessionID` returns `(nil, nil)` on miss | Defensive nil-check before `session.UserID != tenantID` | | R8 | Begin component name vs ID | `FindBeginComponentID` resolves name → ID first, then `ExtractComponentInputForm(dsl, beginID)` | | R9 | Defensive PromptConfig parsing | `stringFromMap` helper used for `prologue` and `tavily_api_key` | | R10 | `BetaAuthMiddleware` Bearer-prefix pre-filter | Removed — `GetUserByToken` is called unconditionally, falls back to `GetUserByBetaAPIToken` | | F8 | Multi-turn chatbot history | `ChatbotCompletion` reads prior turns from `session.Message`, appends user turn, calls LLM, persists new pair via new `API4ConversationDAO.Update` | | F9 | UUID gate stricter than plan | Removed — only `filepath.Base` + CR/LF/quote header sanitization remains | | H2 | Defence-in-depth IDOR | `AgentbotCompletion` calls `loadCanvas` before delegating to `RunAgent` | | M2 | SSE error leakage | `WriteChatbotFrame` emits generic `"an internal error occurred"`; real error logged via `common.Error` | ## Verification ```bash $ go vet ./... # clean (only pre-existing issues) $ go build ./... # success $ go test ./internal/handler/ ./internal/service/ ./internal/agent/dsl/ ./internal/common/ ./internal/dao/ ok ragflow/internal/handler 0.617s ok ragflow/internal/service 1.729s ok ragflow/internal/agent/dsl 0.008s ok ragflow/internal/common 0.087s ok ragflow/internal/dao 0.083s ``` 1199 tests pass across 5 packages. ## Known follow-ups (out of scope for this PR) - **F1**: token-level streaming in `ChatbotCompletion` (currently emits one frame per turn) - **F3**: per-route `auth_types` attribute in Go (currently applied via route group middleware) --------- Co-authored-by: Claude <noreply@anthropic.com>
2026-06-27 16:52:21 +08:00
agentService := service.NewAgentServiceWithOptions(
agentOpts.checkpointStore,
agentOpts.stateSerializer,
agentOpts.runTracker,
feat[Go]: port agent attachment download, chatbot + agentbot completion/info endpoints from Python (#16405) ## Summary Ports five Python agent APIs to Go under the v1 Gin router: - `GET /api/v1/agents/attachments/<attachment_id>/download` - `POST /api/v1/chatbots/<dialog_id>/completions` (SSE) - `GET /api/v1/chatbots/<dialog_id>/info` - `POST /api/v1/agentbots/<agent_id>/completions` (SSE) - `GET /api/v1/agentbots/<agent_id>/inputs` Mirrors the existing Python wire shape (`{code, message, data:{answer,reference,...}}` per Python `canvas_service.completion`) so the iframe SDK and existing JS widgets keep working. ## Behavioural parity with Python | # | Concern | How it's met | |---|---------|--------------| | R0 | Bot routes must not require regular user session | Routes mount on `apiNoAuth` (router.go:198-202), with `BetaAuthMiddleware` only | | R3 | Two SSE formats in Go drift | F2: `AgentChatCompletions` and `AgentbotCompletion` share `service.WriteChatbotRunEvent` | | R7 | `GetBySessionID` returns `(nil, nil)` on miss | Defensive nil-check before `session.UserID != tenantID` | | R8 | Begin component name vs ID | `FindBeginComponentID` resolves name → ID first, then `ExtractComponentInputForm(dsl, beginID)` | | R9 | Defensive PromptConfig parsing | `stringFromMap` helper used for `prologue` and `tavily_api_key` | | R10 | `BetaAuthMiddleware` Bearer-prefix pre-filter | Removed — `GetUserByToken` is called unconditionally, falls back to `GetUserByBetaAPIToken` | | F8 | Multi-turn chatbot history | `ChatbotCompletion` reads prior turns from `session.Message`, appends user turn, calls LLM, persists new pair via new `API4ConversationDAO.Update` | | F9 | UUID gate stricter than plan | Removed — only `filepath.Base` + CR/LF/quote header sanitization remains | | H2 | Defence-in-depth IDOR | `AgentbotCompletion` calls `loadCanvas` before delegating to `RunAgent` | | M2 | SSE error leakage | `WriteChatbotFrame` emits generic `"an internal error occurred"`; real error logged via `common.Error` | ## Verification ```bash $ go vet ./... # clean (only pre-existing issues) $ go build ./... # success $ go test ./internal/handler/ ./internal/service/ ./internal/agent/dsl/ ./internal/common/ ./internal/dao/ ok ragflow/internal/handler 0.617s ok ragflow/internal/service 1.729s ok ragflow/internal/agent/dsl 0.008s ok ragflow/internal/common 0.087s ok ragflow/internal/dao 0.083s ``` 1199 tests pass across 5 packages. ## Known follow-ups (out of scope for this PR) - **F1**: token-level streaming in `ChatbotCompletion` (currently emits one frame per turn) - **F3**: per-route `auth_types` attribute in Go (currently applied via route group middleware) --------- Co-authored-by: Claude <noreply@anthropic.com>
2026-06-27 16:52:21 +08:00
)
agentHandler := handler.NewAgentHandler(agentService, fileService)
// Public chatbot/agentbot endpoints (api/v1/chatbots/...,
// api/v1/agentbots/...) and the agent attachment download.
// BotService delegates the agentbot completion to agentService so
// both paths share the same canvas runner. Reuse the llmService
// already constructed above (line 222) — do NOT redeclare with
// `:=` since the variable is in scope.
botService := service.NewBotService(agentService, llmService)
botHandler := handler.NewBotHandler(botService)
// Wire the TTS synthesizer to the per-tenant model-provider
// dispatch. SynthesizeRequest is routed through
// ModelProviderService.AudioSpeech, which fans out to the
// tenant's configured TTS model driver. When the model
// provider is unconfigured, the synthesizer falls back to a
// no-op echo (the audio package contract), so this is always
// safe to call.
configureTTSSynthesizer(modelProviderService)
searchBotLLM := &handler.SearchBotRealLLM{Svc: modelProviderService}
feat: implement POST /api/v1/searchbots/retrieval_test (#15710) ## What problem does this PR solve? Implements `POST /api/v1/searchbots/retrieval_test` in the Go API server, aligning with the Python `bot_api.py` counterpart. Also applies security hardening and consistency fixes discovered during CTO-level code review: - **Missing endpoint**: `retrieval_test` was not available in Go, requiring Python fallback - **Security**: Both `chunkHandler` and `searchBotHandler` leaked `err.Error()` to API consumers - **Python alignment**: Default values, empty question handling, and `top_k <= 0` validation differed from Python behavior - **Test gaps**: `chunkHandler.RetrievalTest` had zero unit tests; several edge cases uncovered ## Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Refactoring ## Summary ### New Endpoint - `POST /api/v1/searchbots/retrieval_test` — retrieval test with full field support (page, size, top_k, use_kg, cross_languages, keyword, similarity_threshold, vector_similarity_weight) ### New Type - `common.StringSlice` — JSON type that accepts both `"kb1"` and `["kb1", "kb2"]`, matching Python API flexibility ### Security - Both `searchBotHandler` and `chunkHandler` now use `common.Warn()` + generic error messages instead of leaking `err.Error()` to API consumers - All error responses include consistent `"data": nil` shape - `chunkHandler.RetrievalTest` uses interface-based DI (`chunkService`) to enable testability ### Python Alignment - Handler-level defaults align with Python `bot_api.py` (page=1, size=30, top_k=1024, similarity_threshold=0.0, vector_similarity_weight=0.3) - `top_k <= 0` validation matching Python behavior - Empty/whitespace question returns 200 + empty result (matches `chunk_api.py`) - `chunkHandler` `Datasets` field uses `common.StringSlice` for string-or-array flexibility ### Refactoring - `ChunkServiceIface` → `ChunkRetriever`, `chunkSvcIface` → `chunkService` (Go-conventional naming) - Extracted `applyRetrievalDefaults`, `toRetrievalServiceRequest` from handler body - Regex moved to package-level var in `parseRelatedQuestions` - `service.RetrievalTestRequest.Datasets` type changed to `common.StringSlice` - `chunkHandler` now uses consumer-side interface for DI ### Tests - 37 unit tests across both handlers: auth, validation, defaults, StringSlice edge cases, empty/whitespace KbID, service errors, JSON format, `top_k <= 0`, field mapping verification ## Files Changed | File | Change | |------|--------| | `cmd/server_main.go` | Wire new handler + chunkService + difyRetrievalHandler | | `internal/common/json_types.go` | New StringSlice type | | `internal/common/json_types_test.go` | StringSlice tests | | `internal/handler/chunk.go` | Interface-based DI, security, Python alignment, defaults | | `internal/handler/chunk_test.go` | New — 9 comprehensive tests | | `internal/handler/searchbot.go` | New endpoint + refactoring + `top_k <= 0` validation | | `internal/handler/searchbot_test.go` | 18 tests covering all edge cases | | `internal/router/router.go` | Register new route + difyRetrievalHandler | | `internal/service/chunk.go` | Datasets type → StringSlice, Question binding relaxed | 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 16:16:56 +08:00
searchBotHandler := handler.NewSearchBotHandler(
searchService,
tenantService,
searchBotLLM,
feat: implement POST /api/v1/searchbots/retrieval_test (#15710) ## What problem does this PR solve? Implements `POST /api/v1/searchbots/retrieval_test` in the Go API server, aligning with the Python `bot_api.py` counterpart. Also applies security hardening and consistency fixes discovered during CTO-level code review: - **Missing endpoint**: `retrieval_test` was not available in Go, requiring Python fallback - **Security**: Both `chunkHandler` and `searchBotHandler` leaked `err.Error()` to API consumers - **Python alignment**: Default values, empty question handling, and `top_k <= 0` validation differed from Python behavior - **Test gaps**: `chunkHandler.RetrievalTest` had zero unit tests; several edge cases uncovered ## Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Refactoring ## Summary ### New Endpoint - `POST /api/v1/searchbots/retrieval_test` — retrieval test with full field support (page, size, top_k, use_kg, cross_languages, keyword, similarity_threshold, vector_similarity_weight) ### New Type - `common.StringSlice` — JSON type that accepts both `"kb1"` and `["kb1", "kb2"]`, matching Python API flexibility ### Security - Both `searchBotHandler` and `chunkHandler` now use `common.Warn()` + generic error messages instead of leaking `err.Error()` to API consumers - All error responses include consistent `"data": nil` shape - `chunkHandler.RetrievalTest` uses interface-based DI (`chunkService`) to enable testability ### Python Alignment - Handler-level defaults align with Python `bot_api.py` (page=1, size=30, top_k=1024, similarity_threshold=0.0, vector_similarity_weight=0.3) - `top_k <= 0` validation matching Python behavior - Empty/whitespace question returns 200 + empty result (matches `chunk_api.py`) - `chunkHandler` `Datasets` field uses `common.StringSlice` for string-or-array flexibility ### Refactoring - `ChunkServiceIface` → `ChunkRetriever`, `chunkSvcIface` → `chunkService` (Go-conventional naming) - Extracted `applyRetrievalDefaults`, `toRetrievalServiceRequest` from handler body - Regex moved to package-level var in `parseRelatedQuestions` - `service.RetrievalTestRequest.Datasets` type changed to `common.StringSlice` - `chunkHandler` now uses consumer-side interface for DI ### Tests - 37 unit tests across both handlers: auth, validation, defaults, StringSlice edge cases, empty/whitespace KbID, service errors, JSON format, `top_k <= 0`, field mapping verification ## Files Changed | File | Change | |------|--------| | `cmd/server_main.go` | Wire new handler + chunkService + difyRetrievalHandler | | `internal/common/json_types.go` | New StringSlice type | | `internal/common/json_types_test.go` | StringSlice tests | | `internal/handler/chunk.go` | Interface-based DI, security, Python alignment, defaults | | `internal/handler/chunk_test.go` | New — 9 comprehensive tests | | `internal/handler/searchbot.go` | New endpoint + refactoring + `top_k <= 0` validation | | `internal/handler/searchbot_test.go` | 18 tests covering all edge cases | | `internal/router/router.go` | Register new route + difyRetrievalHandler | | `internal/service/chunk.go` | Datasets type → StringSlice, Question binding relaxed | 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 16:16:56 +08:00
chunkService,
)
searchBotHandler.SetStreamLLM(searchBotLLM)
searchBotHandler.SetAskService(service.NewAskService(chunkService, nil, 0, 0))
pluginHandler := handler.NewPluginHandler(service.NewPluginService())
Go: new CLI command, list all models and show model (#15786) ### What problem does this PR solve? ``` RAGFlow(user)> list models; +---------------------------+------------+-------------+--------------------+---------------------------------------------+ | alias | max_tokens | model_types | name | thinking | +---------------------------+------------+-------------+--------------------+---------------------------------------------+ | | 1048576 | [chat] | deepseek-v4-flash | map[clear_thinking:true default_value:true] | | | 1048576 | [chat] | deepseek-v4-pro | map[clear_thinking:true default_value:true] | | | 1024000 | [chat] | minimax-m3 | map[clear_thinking:true default_value:true] | | | 64000 | [vision] | glm-4.5v | map[clear_thinking:true default_value:true] | | [baai/bge-m3] | 8192 | [embedding] | bge-m3 | | | [baai/bge-reranker-v2-m3] | 1024 | [rerank] | bge-reranker-v2-m3 | | | | | [tts] | step-audio-tts-3b | | | [qwen/qwen3-asr-1.7b] | | [asr] | qwen3-asr-1.7b | | | [paddleocr-vl-1.5] | | [ocr] | paddleocr-vl-0.9b | | +---------------------------+------------+-------------+--------------------+---------------------------------------------+ RAGFlow(user)> show model 'minimax-m3'; +--------------+---------------------------------------------+ | field | value | +--------------+---------------------------------------------+ | name | minimax-m3 | | max_tokens | 1024000 | | model_types | [chat] | | thinking | map[clear_thinking:true default_value:true] | | class | | | alias | | | ModelTypeMap | | +--------------+---------------------------------------------+ RAGFlow(user)> show model 'baai/bge-m3'; +--------------+---------------+ | field | value | +--------------+---------------+ | model_types | [embedding] | | thinking | | | class | | | alias | [baai/bge-m3] | | ModelTypeMap | | | name | bge-m3 | | max_tokens | 8192 | +--------------+---------------+ ``` --------- Signed-off-by: Jin Hai <haijin.chn@gmail.com>
2026-06-08 21:38:15 +08:00
modelHandler := handler.NewModelHandler(service.NewModelProviderService())
Add git-like file commit API (#15978) ### What problem does this PR solve? | # | Method | Endpoint | Description | Git Equivalent | |---|--------|----------|-------------|----------------| | 1 | `POST` | `/api/v1/{prefix}/{folder_id}/commits` | Create a snapshot commit with file changes (add/modify/delete/rename) | `git add` + `git commit` | | 2 | `GET` | `/api/v1/{prefix}/{folder_id}/commits` | List commit history (paginated) | `git log` | | 3 | `GET` | `/api/v1/{prefix}/{folder_id}/commits/{commit_id}` | Get commit detail with file changes | `git show` | | 4 | `GET` | `/api/v1/{prefix}/{folder_id}/commits/{commit_id}/files` | List file changes in a commit | `git show --name-status` | | 5 | `GET` | `/api/v1/{prefix}/{folder_id}/commits/diff?from=...&to=...` | Compare two commits and return differences | `git diff` | | 6 | `GET` | `/api/v1/{prefix}/{folder_id}/changes` | Get uncommitted changes (add/modify/delete) | `git status` | | 7 | `GET` | `/api/v1/{prefix}/{folder_id}/commits/{commit_id}/tree` | Get the folder tree snapshot at commit time | `git ls-tree` | | 8 | `GET` | `/api/v1/{prefix}/{folder_id}/commits/{commit_id}/files/{file_id}/content` | Get a file's content as it existed in a specific commit | `git show HEAD:file` | | 9 | `GET` | `/api/v1/{prefix}/{file_id}/versions` | Get version history for a specific file across all commits | `git log -- file` | Where `{prefix}/{id}` can be: - `folders/{folder_id}` — direct folder access - `workspaces/{workspace_id}` — alias of `folders/{folder_id}` - `datasets/{dataset_id}` — resolves to the dataset's folder - `memories/{memory_id}` — resolves to the memory's folder - `skills/{skill_id}` — resolves to the skill's folder ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Documentation Update
2026-06-15 11:19:56 +08:00
fileCommitHandler := handler.NewFileCommitHandler(service.NewFileCommitService())
// Dify retrieval handler
docDAO := documentDAO
retrievalService := nlp.NewRetrievalService(docEngine, docDAO)
difyRetrievalHandler := handler.NewDifyRetrievalHandler(
knowledgebaseService,
modelProviderService,
metadataService,
retrievalService,
docDAO,
docEngine,
)
// Per-tenant canvas-runtime override selector, backed by the
// existing Redis client and the global logger. The handler is
// ALWAYS constructed, even when Redis is briefly unavailable at
// startup, so the POST /api/v1/admin/canvas-runtime/:tenant_id
// endpoint stays registered and returns the explicit
// ErrSelectorNotConfigured (HTTP 500) path until Redis recovers.
// Skipping handler construction when rdb == nil silently removed
// the route until the next process restart, so a transient
// Redis blip at boot stranded canary operators with a 404 they
// could not diagnose from the client side. Keep the route hot.
feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952) Ports the agent canvas subsystem from Python to Go. ## What's included ### Canvas Engine (Phase 0/1) - State engine, scheduler, variable resolver, Redis checkpoint store, cancel protocol - **209 tests** across canvas / component / io packages ### 22 Components (P0–P4) | Tier | Components | |---|---| | P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin, Message, Invoke | | P1 T3 | VariableAggregator, VariableAssigner, StringTransform, ListOperations, DataOperations | | P2 T3 | Iteration, IterationItem, Loop, LoopItem | | P3 T3 | UserFillUp, Fillup | | P4 T5 | Browser, ExcelProcessor, DocsGenerator | ### DSL v2 Schema (Phase 2.5) - Typed v2 in-memory model with v1-to-v2 auto-detect converter - v1 legacy field stripping per plan §2.11.7 ### HTTP Endpoints & Bug Fixes (Plans PR1–PR3) - **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)` pattern - **CreateAgent validation**: title/DSL required, duplicate check, 103 envelope - **13 new endpoints**: templates, prompts, tags, sessions CRUD, chat/completions (SSE + non-stream stubs), rerun, test_db_connection, logs, webhook/logs - **756 Go unit tests** (745 → 756, +18) - **17 → 0 Python integration test failures** (test_agents.py + test_session_management/) ### Tools 21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory stubs ### Infrastructure OTel observability, NATS message queue, DeepDoc gRPC client, SSRF guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
var adminRuntimeSelector *runtime.Selector
if rdb := redis.Get().GetClient(); rdb != nil {
feat(agent): Go port — canvas engine, 22 components, DSL v2, 13 endpoints (#15952) Ports the agent canvas subsystem from Python to Go. ## What's included ### Canvas Engine (Phase 0/1) - State engine, scheduler, variable resolver, Redis checkpoint store, cancel protocol - **209 tests** across canvas / component / io packages ### 22 Components (P0–P4) | Tier | Components | |---|---| | P0 T1+T2+T3 | LLM, Agent, ExitLoop, Switch, Categorize, Begin, Message, Invoke | | P1 T3 | VariableAggregator, VariableAssigner, StringTransform, ListOperations, DataOperations | | P2 T3 | Iteration, IterationItem, Loop, LoopItem | | P3 T3 | UserFillUp, Fillup | | P4 T5 | Browser, ExcelProcessor, DocsGenerator | ### DSL v2 Schema (Phase 2.5) - Typed v2 in-memory model with v1-to-v2 auto-detect converter - v1 legacy field stripping per plan §2.11.7 ### HTTP Endpoints & Bug Fixes (Plans PR1–PR3) - **DELETE SQL bug fix**: gorm v2 `Where("id = ?", id).Delete(...)` pattern - **CreateAgent validation**: title/DSL required, duplicate check, 103 envelope - **13 new endpoints**: templates, prompts, tags, sessions CRUD, chat/completions (SSE + non-stream stubs), rerun, test_db_connection, logs, webhook/logs - **756 Go unit tests** (745 → 756, +18) - **17 → 0 Python integration test failures** (test_agents.py + test_session_management/) ### Tools 21 eino tools: HTTPHelper, search tools, financial/data tools, mandatory stubs ### Infrastructure OTel observability, NATS message queue, DeepDoc gRPC client, SSRF guards, IDOR mitigation
2026-06-12 22:58:28 +08:00
adminRuntimeSelector = runtime.NewSelector(rdb, common.Logger)
}
adminRuntimeHandler := handler.NewAdminRuntimeHandler(adminRuntimeSelector)
// Initialize router
feat[Go]: port agent attachment download, chatbot + agentbot completion/info endpoints from Python (#16405) ## Summary Ports five Python agent APIs to Go under the v1 Gin router: - `GET /api/v1/agents/attachments/<attachment_id>/download` - `POST /api/v1/chatbots/<dialog_id>/completions` (SSE) - `GET /api/v1/chatbots/<dialog_id>/info` - `POST /api/v1/agentbots/<agent_id>/completions` (SSE) - `GET /api/v1/agentbots/<agent_id>/inputs` Mirrors the existing Python wire shape (`{code, message, data:{answer,reference,...}}` per Python `canvas_service.completion`) so the iframe SDK and existing JS widgets keep working. ## Behavioural parity with Python | # | Concern | How it's met | |---|---------|--------------| | R0 | Bot routes must not require regular user session | Routes mount on `apiNoAuth` (router.go:198-202), with `BetaAuthMiddleware` only | | R3 | Two SSE formats in Go drift | F2: `AgentChatCompletions` and `AgentbotCompletion` share `service.WriteChatbotRunEvent` | | R7 | `GetBySessionID` returns `(nil, nil)` on miss | Defensive nil-check before `session.UserID != tenantID` | | R8 | Begin component name vs ID | `FindBeginComponentID` resolves name → ID first, then `ExtractComponentInputForm(dsl, beginID)` | | R9 | Defensive PromptConfig parsing | `stringFromMap` helper used for `prologue` and `tavily_api_key` | | R10 | `BetaAuthMiddleware` Bearer-prefix pre-filter | Removed — `GetUserByToken` is called unconditionally, falls back to `GetUserByBetaAPIToken` | | F8 | Multi-turn chatbot history | `ChatbotCompletion` reads prior turns from `session.Message`, appends user turn, calls LLM, persists new pair via new `API4ConversationDAO.Update` | | F9 | UUID gate stricter than plan | Removed — only `filepath.Base` + CR/LF/quote header sanitization remains | | H2 | Defence-in-depth IDOR | `AgentbotCompletion` calls `loadCanvas` before delegating to `RunAgent` | | M2 | SSE error leakage | `WriteChatbotFrame` emits generic `"an internal error occurred"`; real error logged via `common.Error` | ## Verification ```bash $ go vet ./... # clean (only pre-existing issues) $ go build ./... # success $ go test ./internal/handler/ ./internal/service/ ./internal/agent/dsl/ ./internal/common/ ./internal/dao/ ok ragflow/internal/handler 0.617s ok ragflow/internal/service 1.729s ok ragflow/internal/agent/dsl 0.008s ok ragflow/internal/common 0.087s ok ragflow/internal/dao 0.083s ``` 1199 tests pass across 5 packages. ## Known follow-ups (out of scope for this PR) - **F1**: token-level streaming in `ChatbotCompletion` (currently emits one frame per turn) - **F3**: per-route `auth_types` attribute in Go (currently applied via route group middleware) --------- Co-authored-by: Claude <noreply@anthropic.com>
2026-06-27 16:52:21 +08:00
r := router.NewRouter(authHandler, userHandler, tenantHandler, documentHandler, datasetsHandler, systemHandler, knowledgebaseHandler, chunkHandler, llmHandler, chatHandler, chatChannelHandler, langfuseHandler, chatSessionHandler, connectorHandler, searchHandler, fileHandler, memoryHandler, mcpHandler, skillSearchHandler, providerHandler, agentHandler, searchBotHandler, difyRetrievalHandler, pluginHandler, modelHandler, fileCommitHandler, adminRuntimeHandler, openaiChatHandler, botHandler)
// Create Gin engine
ginEngine := gin.New()
// Middleware
// Note: common.GinLogger() is registered inside router.Setup so the
// HTTP request log captures every endpoint the router owns (including
// those registered by Setup itself). Registering it here would run
// it twice for those endpoints and double every access-log line.
ginEngine.Use(gin.Recovery())
// Setup routes
r.Setup(ginEngine)
// Create HTTP server with timeouts to prevent slow clients from blocking shutdown
addr := fmt.Sprintf(":%d", config.Server.Port)
srv := &http.Server{
Addr: addr,
Handler: ginEngine,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 60 * time.Second,
WriteTimeout: 120 * time.Second,
IdleTimeout: 120 * time.Second,
}
// Start server in a goroutine
go func() {
common.Info(
"\n ____ ___ ______ ______ __\n" +
" / __ \\ / | / ____// ____// /____ _ __\n" +
" / /_/ // /| | / / __ / /_ / // __ \\| | /| / /\n" +
" / _, _// ___ |/ /_/ // __/ / // /_/ /| |/ |/ /\n" +
" /_/ |_|/_/ |_|\\____//_/ /_/ \\____/ |__/|__/\n",
)
common.Info(fmt.Sprintf("RAGFlow Go Version: %s", utility.GetRAGFlowVersion()))
common.Info(fmt.Sprintf("Server starting on port: %d", config.Server.Port))
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
common.Fatal("Failed to start server", zap.Error(err))
}
}()
// Get local IP address for heartbeat reporting
localIP, err := utility.GetLocalIP()
if err != nil {
common.Fatal("fail to get local ip address")
}
// Initialize and start heartbeat reporter to admin server
Go: use NATS as the message queue (#15327) ### What problem does this PR solve? ``` RAGFlow(admin)> mq publish 'msg2'; SUCCESS RAGFlow(admin)> mq publish 'msg3'; SUCCESS RAGFlow(admin)> mq list; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | | msg3 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull 2; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull noack; +---------+---------------+ | message | subject | +---------+---------------+ | abc | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq show +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | ack_pending_count | consumer_count | memory | message_count | pending_count | redelivered_count | waiting_count | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | 2 | 1 | 0 | 2 | 0 | 1 | 0 | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ RAGFlow(admin)> list ingestors; +--------------+-------------------------------------------+--------+ | host | name | status | +--------------+-------------------------------------------+--------+ | 192.168.1.38 | ingestor-8f0e4bd5650a4ac58b0151969fbf6935 | alive | +--------------+-------------------------------------------+--------+ RAGFlow(admin)> list ingestion tasks; +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | document_id | id | status | step | user | user_id | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | ffe64fae423411f1a2d938a74640adcc | 90d3d0f6528941c1ac8eb0360effccc4 | COMPLETED | 5 | aaa@aaa.com | 2ba4881420fa11f19e9c38a74640adcc | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ RAGFlow(admin)> remove ingestion tasks '90d3d0f6528941c1ac8eb0360effccc4'; +---------+----------------------------------+ | delete | task_id | +---------+----------------------------------+ | success | 90d3d0f6528941c1ac8eb0360effccc4 | +---------+----------------------------------+ RAGFlow(admin)> stop ingestion tasks 'e89e20d9a25848a1b79bd9345ddbfe1d'; +----------+----------------------------------+ | status | task_id | +----------+----------------------------------+ | STOPPING | e89e20d9a25848a1b79bd9345ddbfe1d | +----------+----------------------------------+ # Publish a message RAGFlow(admin)> mq publish 'cdd'; SUCCESS # List current tasks in the message queue RAGFlow(admin)> mq list +----------------------------------+---------------+ | message | subject | +----------------------------------+---------------+ | 7ce392a3c1624cd2be4b5276e8825059 | tasks.RAGFLOW | +----------------------------------+---------------+ # Consume a task from the message queue RAGFlow(admin)> mq pull +------+-----+----------------+ | ack | id | type | +------+-----+----------------+ | true | cdd | ingestion_test | +------+-----+----------------+ # User mode # List ingestion tasks, followed by dataset id RAGFlow(user)> list ingestion tasks from '0abe79f9423311f1ad8d38a74640adcc'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ RAGFlow(user)> list ingestion tasks; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-06-02T19:02:31+08:00 | 1780398151417 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | e89e20d9a25848a1b79bd9345ddbfe1d | | COMPLETED | 2026-06-02T19:02:52+08:00 | 1780398172208 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Create an ingestion task # First argument is document id, second argument is dataset id RAGFlow(user)> start ingestion 'ffe64fae423411f1a2d938a74640adcc' from '0abe79f9423311f1ad8d38a74640adcc'; +----------------------------------+-------------------------------------------+ | document_id | result | +----------------------------------+-------------------------------------------+ | ffe64fae423411f1a2d938a74640adcc | task_id: 8d758cd14a8b4ba8ab505003fb52017d | +----------------------------------+-------------------------------------------+ # Pause an ingestion task, first argument is ingestion id RAGFlow(user)> stop ingestion '8d758cd14a8b4ba8ab505003fb52017d'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Delete an ingestion task RAGFlow(api/default)> remove ingestion tasks 'f366450a27d54677aec1c7090add30f0'; +---------+----------------------------------+ | remove | task_id | +---------+----------------------------------+ | success | f366450a27d54677aec1c7090add30f0 | +---------+----------------------------------+ ``` ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai <haijin.chn@gmail.com>
2026-06-12 14:56:44 +08:00
service.AdminServiceClient = service.NewAdminClient(
common.Logger,
common.ServerTypeAPI,
fmt.Sprintf("ragflow-server-%d", config.Server.Port),
localIP,
config.Server.Port,
)
Go: use NATS as the message queue (#15327) ### What problem does this PR solve? ``` RAGFlow(admin)> mq publish 'msg2'; SUCCESS RAGFlow(admin)> mq publish 'msg3'; SUCCESS RAGFlow(admin)> mq list; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | | msg3 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull 2; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull noack; +---------+---------------+ | message | subject | +---------+---------------+ | abc | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq show +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | ack_pending_count | consumer_count | memory | message_count | pending_count | redelivered_count | waiting_count | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | 2 | 1 | 0 | 2 | 0 | 1 | 0 | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ RAGFlow(admin)> list ingestors; +--------------+-------------------------------------------+--------+ | host | name | status | +--------------+-------------------------------------------+--------+ | 192.168.1.38 | ingestor-8f0e4bd5650a4ac58b0151969fbf6935 | alive | +--------------+-------------------------------------------+--------+ RAGFlow(admin)> list ingestion tasks; +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | document_id | id | status | step | user | user_id | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | ffe64fae423411f1a2d938a74640adcc | 90d3d0f6528941c1ac8eb0360effccc4 | COMPLETED | 5 | aaa@aaa.com | 2ba4881420fa11f19e9c38a74640adcc | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ RAGFlow(admin)> remove ingestion tasks '90d3d0f6528941c1ac8eb0360effccc4'; +---------+----------------------------------+ | delete | task_id | +---------+----------------------------------+ | success | 90d3d0f6528941c1ac8eb0360effccc4 | +---------+----------------------------------+ RAGFlow(admin)> stop ingestion tasks 'e89e20d9a25848a1b79bd9345ddbfe1d'; +----------+----------------------------------+ | status | task_id | +----------+----------------------------------+ | STOPPING | e89e20d9a25848a1b79bd9345ddbfe1d | +----------+----------------------------------+ # Publish a message RAGFlow(admin)> mq publish 'cdd'; SUCCESS # List current tasks in the message queue RAGFlow(admin)> mq list +----------------------------------+---------------+ | message | subject | +----------------------------------+---------------+ | 7ce392a3c1624cd2be4b5276e8825059 | tasks.RAGFLOW | +----------------------------------+---------------+ # Consume a task from the message queue RAGFlow(admin)> mq pull +------+-----+----------------+ | ack | id | type | +------+-----+----------------+ | true | cdd | ingestion_test | +------+-----+----------------+ # User mode # List ingestion tasks, followed by dataset id RAGFlow(user)> list ingestion tasks from '0abe79f9423311f1ad8d38a74640adcc'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ RAGFlow(user)> list ingestion tasks; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-06-02T19:02:31+08:00 | 1780398151417 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | e89e20d9a25848a1b79bd9345ddbfe1d | | COMPLETED | 2026-06-02T19:02:52+08:00 | 1780398172208 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Create an ingestion task # First argument is document id, second argument is dataset id RAGFlow(user)> start ingestion 'ffe64fae423411f1a2d938a74640adcc' from '0abe79f9423311f1ad8d38a74640adcc'; +----------------------------------+-------------------------------------------+ | document_id | result | +----------------------------------+-------------------------------------------+ | ffe64fae423411f1a2d938a74640adcc | task_id: 8d758cd14a8b4ba8ab505003fb52017d | +----------------------------------+-------------------------------------------+ # Pause an ingestion task, first argument is ingestion id RAGFlow(user)> stop ingestion '8d758cd14a8b4ba8ab505003fb52017d'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Delete an ingestion task RAGFlow(api/default)> remove ingestion tasks 'f366450a27d54677aec1c7090add30f0'; +---------+----------------------------------+ | remove | task_id | +---------+----------------------------------+ | success | f366450a27d54677aec1c7090add30f0 | +---------+----------------------------------+ ``` ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai <haijin.chn@gmail.com>
2026-06-12 14:56:44 +08:00
if err = service.AdminServiceClient.InitHTTPClient(); err != nil {
common.Warn("Failed to initialize heartbeat service", zap.Error(err))
} else {
// Start heartbeat reporter with 30 seconds interval
heartbeatReporter := utility.NewScheduledTask("Heartbeat reporter", 3*time.Second, func() {
Go: use NATS as the message queue (#15327) ### What problem does this PR solve? ``` RAGFlow(admin)> mq publish 'msg2'; SUCCESS RAGFlow(admin)> mq publish 'msg3'; SUCCESS RAGFlow(admin)> mq list; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | | msg3 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull 2; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull noack; +---------+---------------+ | message | subject | +---------+---------------+ | abc | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq show +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | ack_pending_count | consumer_count | memory | message_count | pending_count | redelivered_count | waiting_count | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | 2 | 1 | 0 | 2 | 0 | 1 | 0 | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ RAGFlow(admin)> list ingestors; +--------------+-------------------------------------------+--------+ | host | name | status | +--------------+-------------------------------------------+--------+ | 192.168.1.38 | ingestor-8f0e4bd5650a4ac58b0151969fbf6935 | alive | +--------------+-------------------------------------------+--------+ RAGFlow(admin)> list ingestion tasks; +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | document_id | id | status | step | user | user_id | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | ffe64fae423411f1a2d938a74640adcc | 90d3d0f6528941c1ac8eb0360effccc4 | COMPLETED | 5 | aaa@aaa.com | 2ba4881420fa11f19e9c38a74640adcc | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ RAGFlow(admin)> remove ingestion tasks '90d3d0f6528941c1ac8eb0360effccc4'; +---------+----------------------------------+ | delete | task_id | +---------+----------------------------------+ | success | 90d3d0f6528941c1ac8eb0360effccc4 | +---------+----------------------------------+ RAGFlow(admin)> stop ingestion tasks 'e89e20d9a25848a1b79bd9345ddbfe1d'; +----------+----------------------------------+ | status | task_id | +----------+----------------------------------+ | STOPPING | e89e20d9a25848a1b79bd9345ddbfe1d | +----------+----------------------------------+ # Publish a message RAGFlow(admin)> mq publish 'cdd'; SUCCESS # List current tasks in the message queue RAGFlow(admin)> mq list +----------------------------------+---------------+ | message | subject | +----------------------------------+---------------+ | 7ce392a3c1624cd2be4b5276e8825059 | tasks.RAGFLOW | +----------------------------------+---------------+ # Consume a task from the message queue RAGFlow(admin)> mq pull +------+-----+----------------+ | ack | id | type | +------+-----+----------------+ | true | cdd | ingestion_test | +------+-----+----------------+ # User mode # List ingestion tasks, followed by dataset id RAGFlow(user)> list ingestion tasks from '0abe79f9423311f1ad8d38a74640adcc'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ RAGFlow(user)> list ingestion tasks; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-06-02T19:02:31+08:00 | 1780398151417 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | e89e20d9a25848a1b79bd9345ddbfe1d | | COMPLETED | 2026-06-02T19:02:52+08:00 | 1780398172208 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Create an ingestion task # First argument is document id, second argument is dataset id RAGFlow(user)> start ingestion 'ffe64fae423411f1a2d938a74640adcc' from '0abe79f9423311f1ad8d38a74640adcc'; +----------------------------------+-------------------------------------------+ | document_id | result | +----------------------------------+-------------------------------------------+ | ffe64fae423411f1a2d938a74640adcc | task_id: 8d758cd14a8b4ba8ab505003fb52017d | +----------------------------------+-------------------------------------------+ # Pause an ingestion task, first argument is ingestion id RAGFlow(user)> stop ingestion '8d758cd14a8b4ba8ab505003fb52017d'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Delete an ingestion task RAGFlow(api/default)> remove ingestion tasks 'f366450a27d54677aec1c7090add30f0'; +---------+----------------------------------+ | remove | task_id | +---------+----------------------------------+ | success | f366450a27d54677aec1c7090add30f0 | +---------+----------------------------------+ ``` ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai <haijin.chn@gmail.com>
2026-06-12 14:56:44 +08:00
if err = service.AdminServiceClient.SendHeartbeat(); err == nil {
local.SetAdminStatus(0, "")
} else {
local.SetAdminStatus(1, err.Error())
//logger.Warn(fmt.Sprintf(err.Error()))
}
})
heartbeatReporter.Start()
defer heartbeatReporter.Stop()
}
// Wait for interrupt signal to gracefully shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2)
sig := <-quit
common.Info(fmt.Sprintf("Receives %s signal to shutdown server", strings.ToUpper(sig.String())))
common.Info("Shutting down server...")
// Create context with timeout for graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Shutdown server
if err = srv.Shutdown(ctx); err != nil {
common.Fatal("Server forced to shutdown", zap.Error(err))
}
}
// agentRunOptions bundles the three optional injection slots the
// agent service accepts via NewAgentServiceWithOptions: the Redis-
// backed CheckPointStore, StateSerializer, and RunTracker. The
// fields stay nil when the underlying constructors fail (Redis
// unreachable, etc.); the agent service treats nil as "in-memory
// / no-tracking" so the server continues to serve traffic without
// requiring Redis to be up.
type agentRunOptions struct {
checkpointStore canvas.CheckPointStore
stateSerializer canvas.StateSerializer
runTracker *canvas.RunTracker
}
// buildAgentRunOptions installs the Redis-backed run infrastructure
// when Redis is available. The Redis client is the one already
// initialised at the top of main; the TTL is a conservative 24h for
// both the checkpoint store and the run tracker. On any error
// (Redis down at boot, constructor panic, nil-Redis fallback) we
// log and return a zero-value struct — the agent service falls back
// to the in-memory path transparently.
func buildAgentRunOptions() agentRunOptions {
var out agentRunOptions
if !redis.IsEnabled() || redis.Get() == nil {
common.Info("agent: redis client not initialised; agent run infra in in-memory mode (no checkpoints, no run tracker)")
return out
}
cp := canvas.NewRedisCheckPointStore(24 * time.Hour)
out.checkpointStore = cp
// stateSerializer is intentionally left nil. eino's default
// InternalSerializer (used when no compose.WithSerializer is
// passed at compile time) already knows how to round-trip
// runtime.CanvasState because the runtime package registers
// it via compose.RegisterSerializableType[CanvasState] in
// init(). Overriding with RAGFlow's plain-JSON
// CanvasStateSerializer (json.Marshal/Unmarshal) produces
// bytes the InternalSerializer cannot decode on the resume
// pass — the UserFillUp two-node pattern surfaces this as
// "load checkpoint from store fail: cannot unmarshal object
// into Go struct field checkpoint.Channels of type
// compose.channel". Rely on eino's default instead.
rt := canvas.NewRunTracker(24 * time.Hour)
out.runTracker = rt
common.Info("agent: redis-backed run infra installed (24h TTL on checkpoint store + run tracker; eino default serializer)")
return out
}
// configureTTSSynthesizer installs the audio.ModelProviderFunc
// that dispatches Synthesize requests through the project's
// ModelProviderService. The model provider's AudioSpeech method
// (internal/service/model_service.go) resolves the per-tenant TTS
// model driver, sends the request upstream, and returns
// synthesized audio bytes.
//
// The audio package's NewTTSDispatchFunc helper converts the
// audio.SynthesizeRequest shape into the model's dispatch shape
// (audioContent = req.Text, voice/lang → TTSConfig.Params,
// ModelName from req.Engine). When the model provider is
// unconfigured (nil dispatcher) the helper returns nil, which
// reverts the audio package to its default stub.
func configureTTSSynthesizer(modelProviderService *service.ModelProviderService) {
if modelProviderService == nil {
common.Info("agent: model provider service not initialised; TTS in no-op echo mode")
audio.SetModelProviderSynthesizer(nil)
return
}
audio.SetModelProviderSynthesizer(audio.NewTTSDispatchFunc(modelProviderService))
common.Info("agent: TTS model-provider dispatch installed (audio.Synthesize → ModelProviderService.AudioSpeech)")
}