Go: add command, list, remove, stop tasks (#16190)

### What problem does this PR solve?

```
RAGFlow(admin)> stop user 'abc' ingestion tasks;
+-----------------------------------+-------+--------------------------------------------------------------------------+-------+
| command                           | email | error                                                                    | tasks |
+-----------------------------------+-------+--------------------------------------------------------------------------+-------+
| stop_ingestion_tasks_by_condition | abc   | 'Stop ingestion tasks by condition' is implemented in enterprise edition |       |
+-----------------------------------+-------+--------------------------------------------------------------------------+-------+
RAGFlow(admin)> stop user 'abc' ingestion tasks 'created;
+-----------------------------------+-------+--------------------------------------------------------------------------+----------+-------+
| command                           | email | error                                                                    | status   | tasks |
+-----------------------------------+-------+--------------------------------------------------------------------------+----------+-------+
| stop_ingestion_tasks_by_condition | abc   | 'Stop ingestion tasks by condition' is implemented in enterprise edition | created; |       |
+-----------------------------------+-------+--------------------------------------------------------------------------+----------+-------+
RAGFlow(admin)> stop user 'abc' ingestion tasks 'create';
+-----------------------------------+-------+--------------------------------------------------------------------------+--------+-------+
| command                           | email | error                                                                    | status | tasks |
+-----------------------------------+-------+--------------------------------------------------------------------------+--------+-------+
| stop_ingestion_tasks_by_condition | abc   | 'Stop ingestion tasks by condition' is implemented in enterprise edition | create |       |
+-----------------------------------+-------+--------------------------------------------------------------------------+--------+-------+
RAGFlow(admin)> remove user 'abc' ingestion tasks 'create';
+-------------------------------------+-------+----------------------------------------------------------------------------+--------+-------+
| command                             | email | error                                                                      | status | tasks |
+-------------------------------------+-------+----------------------------------------------------------------------------+--------+-------+
| remove_ingestion_tasks_by_condition | abc   | 'Remove ingestion tasks by condition' is implemented in enterprise edition | create |       |
+-------------------------------------+-------+----------------------------------------------------------------------------+--------+-------+
RAGFlow(admin)> remove user 'abc' ingestion tasks;
+-------------------------------------+-------+----------------------------------------------------------------------------+-------+
| command                             | email | error                                                                      | tasks |
+-------------------------------------+-------+----------------------------------------------------------------------------+-------+
| remove_ingestion_tasks_by_condition | abc   | 'Remove ingestion tasks by condition' is implemented in enterprise edition |       |
+-------------------------------------+-------+----------------------------------------------------------------------------+-------+
```

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai
2026-06-18 17:50:21 +08:00
committed by GitHub
parent 1a8ee8ba61
commit 3eb49ca7f8
5 changed files with 396 additions and 40 deletions

View File

@@ -17,6 +17,7 @@
package admin
import (
"fmt"
"ragflow/internal/common"
"ragflow/internal/dao"
"ragflow/internal/entity"
@@ -498,3 +499,68 @@ func (s *Service) PurgeUsersData(preview bool, days int, userPlan *string, userA
return result, nil
}
func (s *Service) ListIngestionTasksByCondition(email, status *string) ([]map[string]interface{}, error) {
if email == nil && status == nil {
return nil, fmt.Errorf("email or status are required")
}
element := map[string]interface{}{
"command": "list_ingestion_tasks_by_condition",
"error": "'List ingestion tasks by condition' is implemented in enterprise edition",
}
if email != nil {
element["email"] = *email
}
if status != nil {
element["status"] = *status
}
return []map[string]interface{}{element}, nil
}
func (s *Service) StopIngestionTasksByCondition(tasks []string, email, status *string) ([]map[string]interface{}, error) {
if email == nil && status == nil {
return nil, fmt.Errorf("email or status are required")
}
element := map[string]interface{}{
"command": "stop_ingestion_tasks_by_condition",
"tasks": tasks,
"error": "'Stop ingestion tasks by condition' is implemented in enterprise edition",
}
if email != nil {
element["email"] = *email
}
if status != nil {
element["status"] = *status
}
return []map[string]interface{}{element}, nil
}
func (s *Service) RemoveIngestionTasksByCondition(tasks []string, email, status *string) ([]map[string]interface{}, error) {
if email == nil && status == nil {
return nil, fmt.Errorf("email or status are required")
}
element := map[string]interface{}{
"command": "remove_ingestion_tasks_by_condition",
"tasks": tasks,
"error": "'Remove ingestion tasks by condition' is implemented in enterprise edition",
}
if email != nil {
element["email"] = *email
}
if status != nil {
element["status"] = *status
}
return []map[string]interface{}{element}, nil
}

View File

@@ -1202,7 +1202,9 @@ func (h *Handler) ShowMessageQueue(c *gin.Context) {
}
type RemoveIngestionTaskRequest struct {
Tasks []string `json:"tasks" binding:"required"`
Tasks []string `json:"tasks"`
Email *string `json:"email"`
Status *string `json:"status"`
}
func (h *Handler) RemoveIngestionTasks(c *gin.Context) {
@@ -1212,17 +1214,28 @@ func (h *Handler) RemoveIngestionTasks(c *gin.Context) {
return
}
tasks, err := h.service.RemoveIngestionTasks(req.Tasks)
if err != nil {
errorResponse(c, err.Error(), 400)
return
}
if req.Email == nil && req.Status == nil {
tasks, err := h.service.RemoveIngestionTasks(req.Tasks)
if err != nil {
errorResponse(c, err.Error(), 400)
return
}
success(c, tasks, "Remove tasks successfully")
success(c, tasks, "Remove tasks successfully")
} else {
tasks, err := h.service.RemoveIngestionTasksByCondition(req.Tasks, req.Email, req.Status)
if err != nil {
errorResponse(c, err.Error(), 400)
return
}
success(c, tasks, "Remove tasks successfully")
}
}
type StopIngestionTaskRequest struct {
Tasks []string `json:"tasks" binding:"required"`
Tasks []string `json:"tasks"`
Email *string `json:"email"`
Status *string `json:"status"`
}
func (h *Handler) StopIngestionTasks(c *gin.Context) {
@@ -1232,26 +1245,47 @@ func (h *Handler) StopIngestionTasks(c *gin.Context) {
return
}
tasks, err := h.service.StopIngestionTasks(req.Tasks)
if err != nil {
errorResponse(c, err.Error(), 400)
return
}
if req.Email == nil && req.Status == nil {
tasks, err := h.service.StopIngestionTasks(req.Tasks)
if err != nil {
errorResponse(c, err.Error(), 400)
return
}
var result []map[string]string
for _, task := range tasks {
result = append(result, map[string]string{
"task_id": task.ID,
"status": task.Status,
})
}
var result []map[string]string
for _, task := range tasks {
result = append(result, map[string]string{
"task_id": task.ID,
"status": task.Status,
})
success(c, result, "Stop tasks successfully")
} else {
tasks, err := h.service.StopIngestionTasksByCondition(req.Tasks, req.Email, req.Status)
if err != nil {
errorResponse(c, err.Error(), 400)
return
}
success(c, tasks, "Stop tasks successfully")
}
}
success(c, result, "Stop tasks successfully")
type ListIngestionTasksRequest struct {
Email *string `json:"email"`
Status *string `json:"status"`
}
// ListIngestionTasks
func (h *Handler) ListIngestionTasks(c *gin.Context) {
tasks, err := h.service.ListIngestionTasks()
var err error
var tasks []map[string]interface{}
var req ListIngestionTasksRequest
if err = c.ShouldBindJSON(&req); err != nil {
tasks, err = h.service.ListIngestionTasks()
} else {
tasks, err = h.service.ListIngestionTasksByCondition(req.Email, req.Status)
}
if err != nil {
errorResponse(c, err.Error(), 500)
}

View File

@@ -2348,3 +2348,135 @@ func (c *CLI) AdminPurgeUsersCommand(cmd *Command) (ResponseIf, error) {
result.Duration = resp.Duration
return &result, nil
}
func (c *CLI) AdminListUserIngestionTasksCommand(cmd *Command) (ResponseIf, error) {
if c.Config.CLIMode != AdminMode || c.AdminServerClient.LoginToken == nil {
return nil, fmt.Errorf("this command is only allowed in ADMIN mode or already login")
}
userName, ok := cmd.Params["user_name"].(string)
if !ok {
return nil, fmt.Errorf("plan_name not provided")
}
payload := map[string]interface{}{
"email": userName,
}
status, ok := cmd.Params["status"].(string)
if ok {
payload["status"] = status
}
apiURL := "/admin/ingestion/tasks"
resp, err := c.AdminServerClient.Request("GET", apiURL, "admin", nil, payload)
if err != nil {
return nil, fmt.Errorf("failed to list user %s ingestion tasks: %w", userName, err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed to list user %s ingestion tasks: HTTP %d, body: %s", userName, resp.StatusCode, string(resp.Body))
}
var result CommonResponse
if err = json.Unmarshal(resp.Body, &result); err != nil {
return nil, fmt.Errorf("list user %s ingestion tasks failed: invalid JSON (%w)", userName, err)
}
if result.Code != 0 {
return nil, fmt.Errorf("%s", result.Message)
}
result.Duration = resp.Duration
return &result, nil
}
func (c *CLI) AdminStopUserIngestionTasksCommand(cmd *Command) (ResponseIf, error) {
if c.Config.CLIMode != AdminMode || c.AdminServerClient.LoginToken == nil {
return nil, fmt.Errorf("this command is only allowed in ADMIN mode or already login")
}
userName, ok := cmd.Params["user_name"].(string)
if !ok {
return nil, fmt.Errorf("user_name not provided")
}
payload := map[string]interface{}{
"email": userName,
}
status, ok := cmd.Params["status"].(string)
if ok {
payload["status"] = status
}
apiURL := "/admin/ingestion/tasks"
resp, err := c.AdminServerClient.Request("PUT", apiURL, "admin", nil, payload)
if err != nil {
return nil, fmt.Errorf("failed to stop user %s ingestion tasks: %w", userName, err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed to stop user %s ingestion tasks: HTTP %d, body: %s", userName, resp.StatusCode, string(resp.Body))
}
var result CommonResponse
if err = json.Unmarshal(resp.Body, &result); err != nil {
return nil, fmt.Errorf("stop user %s ingestion tasks failed: invalid JSON (%w)", userName, err)
}
if result.Code != 0 {
return nil, fmt.Errorf("%s", result.Message)
}
result.Duration = resp.Duration
return &result, nil
}
func (c *CLI) AdminRemoveUserIngestionTasksCommand(cmd *Command) (ResponseIf, error) {
if c.Config.CLIMode != AdminMode || c.AdminServerClient.LoginToken == nil {
return nil, fmt.Errorf("this command is only allowed in ADMIN mode or already login")
}
userName, ok := cmd.Params["user_name"].(string)
if !ok {
return nil, fmt.Errorf("user_name not provided")
}
payload := map[string]interface{}{
"email": userName,
}
status, ok := cmd.Params["status"].(string)
if ok {
payload["status"] = status
}
apiURL := "/admin/ingestion/tasks"
resp, err := c.AdminServerClient.Request("DELETE", apiURL, "admin", nil, payload)
if err != nil {
return nil, fmt.Errorf("failed to remove user %s ingestion tasks: %w", userName, err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed to remove user %s ingestion tasks: HTTP %d, body: %s", userName, resp.StatusCode, string(resp.Body))
}
var result CommonResponse
if err = json.Unmarshal(resp.Body, &result); err != nil {
return nil, fmt.Errorf("remove user %s ingestion tasks failed: invalid JSON (%w)", userName, err)
}
if result.Code != 0 {
return nil, fmt.Errorf("%s", result.Message)
}
result.Duration = resp.Duration
return &result, nil
}

View File

@@ -138,6 +138,8 @@ func (p *Parser) parseAdminListCommand() (*Command, error) {
return NewCommand("list_services"), nil
case TokenUsers:
return p.parseAdminListUsersCommand()
case TokenUser:
return p.parseAdminListUserCommand()
case TokenRoles:
p.nextToken()
// Semicolon is optional for SHOW TOKEN
@@ -383,26 +385,31 @@ func (p *Parser) parseAdminListIngestors() (*Command, error) {
func (p *Parser) parseAdminStopIngestionTasks() (*Command, error) {
p.nextToken() // consume STOP
if p.curToken.Type != TokenIngestion {
return nil, fmt.Errorf("expected INGESTION")
var cmd *Command
switch p.curToken.Type {
case TokenIngestion:
p.nextToken()
if p.curToken.Type != TokenTasks {
return nil, fmt.Errorf("expected TASKS")
}
p.nextToken() // consume TASK
taskString, err := p.parseQuotedString()
if err != nil {
return nil, err
}
tasks := strings.Split(taskString, " ")
p.nextToken() // consume TASK
cmd = NewCommand("admin_stop_ingestion_tasks")
cmd.Params["tasks"] = tasks
case TokenUser:
return p.parseAdminStopUserCommand()
default:
return nil, fmt.Errorf("expected USER or INGESTION")
}
p.nextToken()
if p.curToken.Type != TokenTasks {
return nil, fmt.Errorf("expected TASKS")
}
p.nextToken() // consume TASK
taskString, err := p.parseQuotedString()
if err != nil {
return nil, err
}
tasks := strings.Split(taskString, " ")
p.nextToken() // consume TASK
cmd := NewCommand("admin_stop_ingestion_tasks")
cmd.Params["tasks"] = tasks
// Semicolon is optional for UNSET TOKEN
if p.curToken.Type == TokenSemicolon {
@@ -1901,6 +1908,8 @@ func (p *Parser) parseAdminRemoveCommand() (*Command, error) {
cmd.Params["service_number"] = serviceNum
case TokenIngestion:
return p.parseAdminRemoveIngestionTasks()
case TokenUser:
return p.parseAdminRemoveUserCommand()
default:
return nil, fmt.Errorf("expected SERVICE")
}
@@ -2517,3 +2526,112 @@ commandLoop:
}
return cmd, nil
}
// LIST USER 'user@example.com' INGESTION TASKS;
func (p *Parser) parseAdminListUserCommand() (*Command, error) {
p.nextToken() // consume USER
userName, err := p.parseQuotedString()
if err != nil {
return nil, err
}
p.nextToken()
var cmd *Command
switch p.curToken.Type {
case TokenIngestion:
p.nextToken()
if p.curToken.Type != TokenTasks {
return nil, fmt.Errorf("expected TASKS after INGESTION")
}
p.nextToken()
cmd = NewCommand("admin_list_user_ingestion_tasks_command")
if p.curToken.Type == TokenQuotedString {
var status string
status, err = p.parseQuotedString()
if err != nil {
return nil, err
}
cmd.Params["status"] = status
p.nextToken()
}
default:
return nil, fmt.Errorf("expected INGESTION after USER")
}
cmd.Params["user_name"] = userName
return cmd, nil
}
// STOP USER 'user@example.com' INGESTION TASKS 'created';
func (p *Parser) parseAdminStopUserCommand() (*Command, error) {
p.nextToken() // consume USER
userName, err := p.parseQuotedString()
if err != nil {
return nil, err
}
p.nextToken()
var cmd *Command
switch p.curToken.Type {
case TokenIngestion:
p.nextToken()
if p.curToken.Type != TokenTasks {
return nil, fmt.Errorf("expected TASKS after INGESTION")
}
p.nextToken()
cmd = NewCommand("admin_stop_user_ingestion_tasks_command")
if p.curToken.Type == TokenQuotedString {
var status string
status, err = p.parseQuotedString()
if err != nil {
return nil, err
}
cmd.Params["status"] = status
p.nextToken()
}
default:
return nil, fmt.Errorf("expected INGESTION after USER")
}
cmd.Params["user_name"] = userName
return cmd, nil
}
// REMOVE USER 'user@example.com' INGESTION TASKS 'created';
func (p *Parser) parseAdminRemoveUserCommand() (*Command, error) {
p.nextToken() // consume USER
userName, err := p.parseQuotedString()
if err != nil {
return nil, err
}
p.nextToken()
var cmd *Command
switch p.curToken.Type {
case TokenIngestion:
p.nextToken()
if p.curToken.Type != TokenTasks {
return nil, fmt.Errorf("expected TASKS after INGESTION")
}
p.nextToken()
cmd = NewCommand("admin_remove_user_ingestion_tasks_command")
if p.curToken.Type == TokenQuotedString {
var status string
status, err = p.parseQuotedString()
if err != nil {
return nil, err
}
cmd.Params["status"] = status
p.nextToken()
}
default:
return nil, fmt.Errorf("expected INGESTION after USER")
}
cmd.Params["user_name"] = userName
return cmd, nil
}

View File

@@ -161,6 +161,12 @@ func (c *CLI) ExecuteAdminCommand(cmd *Command) (ResponseIf, error) {
return c.AdminPurgeUserCommand(cmd)
case "admin_purge_users_command":
return c.AdminPurgeUsersCommand(cmd)
case "admin_list_user_ingestion_tasks_command":
return c.AdminListUserIngestionTasksCommand(cmd)
case "admin_stop_user_ingestion_tasks_command":
return c.AdminStopUserIngestionTasksCommand(cmd)
case "admin_remove_user_ingestion_tasks_command":
return c.AdminRemoveUserIngestionTasksCommand(cmd)
// TODO: Implement other commands
case "show_admin_server":
return c.ShowAdminServer(cmd)