Files
ragflow/internal/admin/router.go
Jin Hai e96bc37d06 Go: use NATS as the message queue (#15327)
### What problem does this PR solve?

```
RAGFlow(admin)> mq publish 'msg2';
SUCCESS
RAGFlow(admin)> mq publish 'msg3';
SUCCESS
RAGFlow(admin)> mq list;
+---------+---------------+
| message | subject       |
+---------+---------------+
| msg1    | tasks.RAGFLOW |
| msg2    | tasks.RAGFLOW |
| msg3    | tasks.RAGFLOW |
+---------+---------------+
RAGFlow(admin)> mq pull 2;
+---------+---------------+
| message | subject       |
+---------+---------------+
| msg1    | tasks.RAGFLOW |
| msg2    | tasks.RAGFLOW |
+---------+---------------+
RAGFlow(admin)> mq pull noack;
+---------+---------------+
| message | subject       |
+---------+---------------+
| abc     | tasks.RAGFLOW |
+---------+---------------+
RAGFlow(admin)> mq show
+-------------------+----------------+--------+---------------+---------------+-------------------+---------------+
| ack_pending_count | consumer_count | memory | message_count | pending_count | redelivered_count | waiting_count |
+-------------------+----------------+--------+---------------+---------------+-------------------+---------------+
| 2                 | 1              | 0      | 2             | 0             | 1                 | 0             |
+-------------------+----------------+--------+---------------+---------------+-------------------+---------------+

RAGFlow(admin)> list ingestors;
+--------------+-------------------------------------------+--------+
| host         | name                                      | status |
+--------------+-------------------------------------------+--------+
| 192.168.1.38 | ingestor-8f0e4bd5650a4ac58b0151969fbf6935 | alive  |
+--------------+-------------------------------------------+--------+

RAGFlow(admin)> list ingestion tasks;
+----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+
| document_id                      | id                               | status    | step | user        | user_id                          |
+----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+
| ffe64fae423411f1a2d938a74640adcc | 90d3d0f6528941c1ac8eb0360effccc4 | COMPLETED | 5    | aaa@aaa.com | 2ba4881420fa11f19e9c38a74640adcc |
+----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+

RAGFlow(admin)> remove ingestion tasks '90d3d0f6528941c1ac8eb0360effccc4';
+---------+----------------------------------+
| delete  | task_id                          |
+---------+----------------------------------+
| success | 90d3d0f6528941c1ac8eb0360effccc4 |
+---------+----------------------------------+

RAGFlow(admin)> stop ingestion tasks 'e89e20d9a25848a1b79bd9345ddbfe1d';
+----------+----------------------------------+
| status   | task_id                          |
+----------+----------------------------------+
| STOPPING | e89e20d9a25848a1b79bd9345ddbfe1d |
+----------+----------------------------------+

# Publish a message
RAGFlow(admin)> mq publish 'cdd';
SUCCESS

# List current tasks in the message queue
RAGFlow(admin)> mq list
+----------------------------------+---------------+
| message                          | subject       |
+----------------------------------+---------------+
| 7ce392a3c1624cd2be4b5276e8825059 | tasks.RAGFLOW |
+----------------------------------+---------------+

# Consume a task from the message queue
RAGFlow(admin)> mq pull
+------+-----+----------------+
| ack  | id  | type           |
+------+-----+----------------+
| true | cdd | ingestion_test |
+------+-----+----------------+

# User mode
# List ingestion tasks, followed by dataset id
RAGFlow(user)> list ingestion tasks from '0abe79f9423311f1ad8d38a74640adcc';
+---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+
| create_date               | create_time   | dataset_id                       | document_id                      | id                               | schema | status    | update_date               | update_time   | user_id                          |
+---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+
| 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d |        | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc |
+---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+

RAGFlow(user)> list ingestion tasks;
+---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+
| create_date               | create_time   | dataset_id                       | document_id                      | id                               | schema | status    | update_date               | update_time   | user_id                          |
+---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+
| 2026-06-02T19:02:31+08:00 | 1780398151417 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | e89e20d9a25848a1b79bd9345ddbfe1d |        | COMPLETED | 2026-06-02T19:02:52+08:00 | 1780398172208 | 2ba4881420fa11f19e9c38a74640adcc |
+---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+

# Create an ingestion task
# First argument is document id, second argument is dataset id
RAGFlow(user)> start ingestion 'ffe64fae423411f1a2d938a74640adcc' from '0abe79f9423311f1ad8d38a74640adcc';
+----------------------------------+-------------------------------------------+
| document_id                      | result                                    |
+----------------------------------+-------------------------------------------+
| ffe64fae423411f1a2d938a74640adcc | task_id: 8d758cd14a8b4ba8ab505003fb52017d |
+----------------------------------+-------------------------------------------+

# Pause an ingestion task, first argument is ingestion id
RAGFlow(user)> stop ingestion '8d758cd14a8b4ba8ab505003fb52017d';
+---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+
| create_date               | create_time   | dataset_id                       | document_id                      | id                               | schema | status    | update_date               | update_time   | user_id                          |
+---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+
| 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d |        | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc |
+---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+

# Delete an ingestion task
RAGFlow(api/default)> remove ingestion tasks 'f366450a27d54677aec1c7090add30f0';
+---------+----------------------------------+
| remove  | task_id                          |
+---------+----------------------------------+
| success | f366450a27d54677aec1c7090add30f0 |
+---------+----------------------------------+

```

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
2026-06-12 14:56:44 +08:00

160 lines
5.8 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package admin
import (
"github.com/gin-gonic/gin"
)
// Router admin router
type Router struct {
handler *Handler
}
// NewRouter create admin router
func NewRouter(handler *Handler) *Router {
return &Router{
handler: handler,
}
}
// Setup setup routes
func (r *Router) Setup(engine *gin.Engine) {
// Health check
engine.GET("/health", r.handler.Health)
// Admin API routes with prefix /api/v1/admin
admin := engine.Group("/api/v1/admin")
{
// Public routes
admin.GET("/ping", r.handler.Ping)
admin.POST("/login", r.handler.Login)
admin.POST("/reports", r.handler.Reports)
//admin.POST("/ingestion/tasks", r.handler.StartIngestionTask)
//admin.DELETE("/ingestion", r.handler.CancelIngestionTask) // cancel ingestion
//admin.GET("/ingestion/tasks", r.handler.ListIngestionTasks)
// Protected routes
protected := admin.Group("")
protected.Use(r.handler.AuthMiddleware())
{
protected.POST("/logout", r.handler.Logout)
// Auth
protected.GET("/auth", r.handler.AuthCheck)
// User management
protected.GET("/users", r.handler.ListUsers)
protected.POST("/users", r.handler.CreateUser)
protected.GET("/users/:username", r.handler.GetUser)
protected.DELETE("/users/:username", r.handler.DeleteUser)
protected.PUT("/users/:username/password", r.handler.ChangePassword)
protected.PUT("/users/:username/activate", r.handler.UpdateUserActivateStatus)
protected.PUT("/users/:username/admin", r.handler.GrantAdmin)
protected.DELETE("/users/:username/admin", r.handler.RevokeAdmin)
protected.GET("/users/:username/datasets", r.handler.GetUserDatasets)
protected.GET("/users/:username/agents", r.handler.GetUserAgents)
// API Keys
protected.GET("/users/:username/keys", r.handler.ListUserAPITokens)
protected.GET("/users/:username/tokens", r.handler.ListUserAPITokens)
protected.POST("/users/:username/keys", r.handler.GenerateUserAPIToken)
protected.POST("/users/:username/tokens", r.handler.GenerateUserAPIToken)
protected.DELETE("/users/:username/keys/:token", r.handler.DeleteUserAPIToken)
protected.DELETE("/users/:username/tokens/:token", r.handler.DeleteUserAPIToken)
// Role management
protected.GET("/roles", r.handler.ListRoles)
protected.POST("/roles", r.handler.CreateRole)
protected.GET("/roles/:role_name", r.handler.GetRole)
protected.PUT("/roles/:role_name", r.handler.UpdateRole)
protected.DELETE("/roles/:role_name", r.handler.DeleteRole)
protected.GET("/roles/:role_name/permission", r.handler.GetRolePermission)
protected.POST("/roles/:role_name/permission", r.handler.GrantRolePermission)
protected.DELETE("/roles/:role_name/permission", r.handler.RevokeRolePermission)
// User roles and permissions
protected.PUT("/users/:username/role", r.handler.UpdateUserRole)
protected.GET("/users/:username/permission", r.handler.GetUserPermission)
// Service management
protected.GET("/services", r.handler.GetServices)
protected.GET("/service_types/:service_type", r.handler.GetServicesByType)
protected.GET("/services/:service_id", r.handler.GetService)
protected.DELETE("/services/:service_id", r.handler.ShutdownService)
protected.PUT("/services/:service_id", r.handler.RestartService)
// Variables/Settings
protected.GET("/variables", r.handler.GetVariables)
protected.PUT("/variables", r.handler.SetVariable)
// Configs
protected.GET("/configs", r.handler.GetConfigs)
// Environments
protected.GET("/environments", r.handler.GetEnvironments)
// Version
protected.GET("/version", r.handler.GetVersion)
// Sandbox
protected.GET("/sandbox/providers", r.handler.ListSandboxProviders)
protected.GET("/sandbox/providers/:provider_id/schema", r.handler.GetSandboxProviderSchema)
protected.GET("/sandbox/config", r.handler.GetSandboxConfig)
protected.POST("/sandbox/config", r.handler.SetSandboxConfig)
protected.POST("/sandbox/test", r.handler.TestSandboxConnection)
// Fingerprint
protected.GET("/fingerprint", r.handler.GetFingerprint)
// License
protected.POST("/license", r.handler.SetLicense)
protected.POST("/license/config", r.handler.UpdateLicenseConfig)
protected.GET("/license", r.handler.ShowLicense)
// Log level
protected.GET("/log_level", r.handler.GetLogLevel)
protected.PUT("/log_level", r.handler.SetLogLevel)
provider := protected.Group("/providers")
{
provider.GET("/", r.handler.ListProviders)
provider.GET("/:provider_name", r.handler.ShowProvider)
provider.GET("/:provider_name/models", r.handler.ListModels)
provider.GET("/:provider_name/models/:model_name", r.handler.ShowModel)
}
queue := protected.Group("/queue")
{
queue.GET("/", r.handler.ShowMessageQueue)
queue.POST("/messages", r.handler.PublishMessageToQueue)
queue.GET("/messages", r.handler.ListMessagesFromQueue)
queue.PUT("/messages", r.handler.PullMessageFromQueue)
}
protected.GET("/ingestors", r.handler.ListIngestors)
protected.DELETE("/ingestors", r.handler.ShutdownIngestor)
protected.DELETE("/ingestion/tasks", r.handler.RemoveIngestionTasks)
protected.PUT("/ingestion/tasks", r.handler.StopIngestionTasks)
protected.GET("/ingestion/tasks", r.handler.ListIngestionTasks)
}
}
// Handle undefined routes
engine.NoRoute(r.handler.HandleNoRoute)
}