From 3eb49ca7f80eee328acae03be053881db5690e80 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Thu, 18 Jun 2026 17:50:21 +0800 Subject: [PATCH] Go: add command, list, remove, stop tasks (#16190) ### What problem does this PR solve? ``` RAGFlow(admin)> stop user 'abc' ingestion tasks; +-----------------------------------+-------+--------------------------------------------------------------------------+-------+ | command | email | error | tasks | +-----------------------------------+-------+--------------------------------------------------------------------------+-------+ | stop_ingestion_tasks_by_condition | abc | 'Stop ingestion tasks by condition' is implemented in enterprise edition | | +-----------------------------------+-------+--------------------------------------------------------------------------+-------+ RAGFlow(admin)> stop user 'abc' ingestion tasks 'created; +-----------------------------------+-------+--------------------------------------------------------------------------+----------+-------+ | command | email | error | status | tasks | +-----------------------------------+-------+--------------------------------------------------------------------------+----------+-------+ | stop_ingestion_tasks_by_condition | abc | 'Stop ingestion tasks by condition' is implemented in enterprise edition | created; | | +-----------------------------------+-------+--------------------------------------------------------------------------+----------+-------+ RAGFlow(admin)> stop user 'abc' ingestion tasks 'create'; +-----------------------------------+-------+--------------------------------------------------------------------------+--------+-------+ | command | email | error | status | tasks | +-----------------------------------+-------+--------------------------------------------------------------------------+--------+-------+ | stop_ingestion_tasks_by_condition | abc | 'Stop ingestion tasks by condition' is implemented in enterprise edition | create | | +-----------------------------------+-------+--------------------------------------------------------------------------+--------+-------+ RAGFlow(admin)> remove user 'abc' ingestion tasks 'create'; +-------------------------------------+-------+----------------------------------------------------------------------------+--------+-------+ | command | email | error | status | tasks | +-------------------------------------+-------+----------------------------------------------------------------------------+--------+-------+ | remove_ingestion_tasks_by_condition | abc | 'Remove ingestion tasks by condition' is implemented in enterprise edition | create | | +-------------------------------------+-------+----------------------------------------------------------------------------+--------+-------+ RAGFlow(admin)> remove user 'abc' ingestion tasks; +-------------------------------------+-------+----------------------------------------------------------------------------+-------+ | command | email | error | tasks | +-------------------------------------+-------+----------------------------------------------------------------------------+-------+ | remove_ingestion_tasks_by_condition | abc | 'Remove ingestion tasks by condition' is implemented in enterprise edition | | +-------------------------------------+-------+----------------------------------------------------------------------------+-------+ ``` ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai --- internal/admin/enterprise_service.go | 66 ++++++++++++ internal/admin/handler.go | 76 +++++++++---- internal/cli/admin_command.go | 132 +++++++++++++++++++++++ internal/cli/admin_parser.go | 156 +++++++++++++++++++++++---- internal/cli/cli_http.go | 6 ++ 5 files changed, 396 insertions(+), 40 deletions(-) diff --git a/internal/admin/enterprise_service.go b/internal/admin/enterprise_service.go index 6593e9219f..51a685ddcb 100644 --- a/internal/admin/enterprise_service.go +++ b/internal/admin/enterprise_service.go @@ -17,6 +17,7 @@ package admin import ( + "fmt" "ragflow/internal/common" "ragflow/internal/dao" "ragflow/internal/entity" @@ -498,3 +499,68 @@ func (s *Service) PurgeUsersData(preview bool, days int, userPlan *string, userA return result, nil } + +func (s *Service) ListIngestionTasksByCondition(email, status *string) ([]map[string]interface{}, error) { + + if email == nil && status == nil { + return nil, fmt.Errorf("email or status are required") + } + + element := map[string]interface{}{ + "command": "list_ingestion_tasks_by_condition", + "error": "'List ingestion tasks by condition' is implemented in enterprise edition", + } + + if email != nil { + element["email"] = *email + } + if status != nil { + element["status"] = *status + } + + return []map[string]interface{}{element}, nil +} + +func (s *Service) StopIngestionTasksByCondition(tasks []string, email, status *string) ([]map[string]interface{}, error) { + + if email == nil && status == nil { + return nil, fmt.Errorf("email or status are required") + } + + element := map[string]interface{}{ + "command": "stop_ingestion_tasks_by_condition", + "tasks": tasks, + "error": "'Stop ingestion tasks by condition' is implemented in enterprise edition", + } + + if email != nil { + element["email"] = *email + } + if status != nil { + element["status"] = *status + } + + return []map[string]interface{}{element}, nil +} + +func (s *Service) RemoveIngestionTasksByCondition(tasks []string, email, status *string) ([]map[string]interface{}, error) { + + if email == nil && status == nil { + return nil, fmt.Errorf("email or status are required") + } + + element := map[string]interface{}{ + "command": "remove_ingestion_tasks_by_condition", + "tasks": tasks, + "error": "'Remove ingestion tasks by condition' is implemented in enterprise edition", + } + + if email != nil { + element["email"] = *email + } + if status != nil { + element["status"] = *status + } + + return []map[string]interface{}{element}, nil +} diff --git a/internal/admin/handler.go b/internal/admin/handler.go index f60e8cc3ff..51f1a2afa2 100644 --- a/internal/admin/handler.go +++ b/internal/admin/handler.go @@ -1202,7 +1202,9 @@ func (h *Handler) ShowMessageQueue(c *gin.Context) { } type RemoveIngestionTaskRequest struct { - Tasks []string `json:"tasks" binding:"required"` + Tasks []string `json:"tasks"` + Email *string `json:"email"` + Status *string `json:"status"` } func (h *Handler) RemoveIngestionTasks(c *gin.Context) { @@ -1212,17 +1214,28 @@ func (h *Handler) RemoveIngestionTasks(c *gin.Context) { return } - tasks, err := h.service.RemoveIngestionTasks(req.Tasks) - if err != nil { - errorResponse(c, err.Error(), 400) - return - } + if req.Email == nil && req.Status == nil { + tasks, err := h.service.RemoveIngestionTasks(req.Tasks) + if err != nil { + errorResponse(c, err.Error(), 400) + return + } - success(c, tasks, "Remove tasks successfully") + success(c, tasks, "Remove tasks successfully") + } else { + tasks, err := h.service.RemoveIngestionTasksByCondition(req.Tasks, req.Email, req.Status) + if err != nil { + errorResponse(c, err.Error(), 400) + return + } + success(c, tasks, "Remove tasks successfully") + } } type StopIngestionTaskRequest struct { - Tasks []string `json:"tasks" binding:"required"` + Tasks []string `json:"tasks"` + Email *string `json:"email"` + Status *string `json:"status"` } func (h *Handler) StopIngestionTasks(c *gin.Context) { @@ -1232,26 +1245,47 @@ func (h *Handler) StopIngestionTasks(c *gin.Context) { return } - tasks, err := h.service.StopIngestionTasks(req.Tasks) - if err != nil { - errorResponse(c, err.Error(), 400) - return - } + if req.Email == nil && req.Status == nil { + tasks, err := h.service.StopIngestionTasks(req.Tasks) + if err != nil { + errorResponse(c, err.Error(), 400) + return + } + var result []map[string]string + for _, task := range tasks { + result = append(result, map[string]string{ + "task_id": task.ID, + "status": task.Status, + }) + } - var result []map[string]string - for _, task := range tasks { - result = append(result, map[string]string{ - "task_id": task.ID, - "status": task.Status, - }) + success(c, result, "Stop tasks successfully") + } else { + tasks, err := h.service.StopIngestionTasksByCondition(req.Tasks, req.Email, req.Status) + if err != nil { + errorResponse(c, err.Error(), 400) + return + } + success(c, tasks, "Stop tasks successfully") } +} - success(c, result, "Stop tasks successfully") +type ListIngestionTasksRequest struct { + Email *string `json:"email"` + Status *string `json:"status"` } // ListIngestionTasks func (h *Handler) ListIngestionTasks(c *gin.Context) { - tasks, err := h.service.ListIngestionTasks() + var err error + var tasks []map[string]interface{} + var req ListIngestionTasksRequest + if err = c.ShouldBindJSON(&req); err != nil { + tasks, err = h.service.ListIngestionTasks() + } else { + tasks, err = h.service.ListIngestionTasksByCondition(req.Email, req.Status) + } + if err != nil { errorResponse(c, err.Error(), 500) } diff --git a/internal/cli/admin_command.go b/internal/cli/admin_command.go index 27c5d9542f..f53d5f9d99 100644 --- a/internal/cli/admin_command.go +++ b/internal/cli/admin_command.go @@ -2348,3 +2348,135 @@ func (c *CLI) AdminPurgeUsersCommand(cmd *Command) (ResponseIf, error) { result.Duration = resp.Duration return &result, nil } + +func (c *CLI) AdminListUserIngestionTasksCommand(cmd *Command) (ResponseIf, error) { + + if c.Config.CLIMode != AdminMode || c.AdminServerClient.LoginToken == nil { + return nil, fmt.Errorf("this command is only allowed in ADMIN mode or already login") + } + + userName, ok := cmd.Params["user_name"].(string) + if !ok { + return nil, fmt.Errorf("plan_name not provided") + } + + payload := map[string]interface{}{ + "email": userName, + } + + status, ok := cmd.Params["status"].(string) + if ok { + payload["status"] = status + } + + apiURL := "/admin/ingestion/tasks" + + resp, err := c.AdminServerClient.Request("GET", apiURL, "admin", nil, payload) + if err != nil { + return nil, fmt.Errorf("failed to list user %s ingestion tasks: %w", userName, err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to list user %s ingestion tasks: HTTP %d, body: %s", userName, resp.StatusCode, string(resp.Body)) + } + + var result CommonResponse + if err = json.Unmarshal(resp.Body, &result); err != nil { + return nil, fmt.Errorf("list user %s ingestion tasks failed: invalid JSON (%w)", userName, err) + } + + if result.Code != 0 { + return nil, fmt.Errorf("%s", result.Message) + } + + result.Duration = resp.Duration + return &result, nil +} + +func (c *CLI) AdminStopUserIngestionTasksCommand(cmd *Command) (ResponseIf, error) { + + if c.Config.CLIMode != AdminMode || c.AdminServerClient.LoginToken == nil { + return nil, fmt.Errorf("this command is only allowed in ADMIN mode or already login") + } + + userName, ok := cmd.Params["user_name"].(string) + if !ok { + return nil, fmt.Errorf("user_name not provided") + } + + payload := map[string]interface{}{ + "email": userName, + } + + status, ok := cmd.Params["status"].(string) + if ok { + payload["status"] = status + } + + apiURL := "/admin/ingestion/tasks" + + resp, err := c.AdminServerClient.Request("PUT", apiURL, "admin", nil, payload) + if err != nil { + return nil, fmt.Errorf("failed to stop user %s ingestion tasks: %w", userName, err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to stop user %s ingestion tasks: HTTP %d, body: %s", userName, resp.StatusCode, string(resp.Body)) + } + + var result CommonResponse + if err = json.Unmarshal(resp.Body, &result); err != nil { + return nil, fmt.Errorf("stop user %s ingestion tasks failed: invalid JSON (%w)", userName, err) + } + + if result.Code != 0 { + return nil, fmt.Errorf("%s", result.Message) + } + + result.Duration = resp.Duration + return &result, nil +} + +func (c *CLI) AdminRemoveUserIngestionTasksCommand(cmd *Command) (ResponseIf, error) { + + if c.Config.CLIMode != AdminMode || c.AdminServerClient.LoginToken == nil { + return nil, fmt.Errorf("this command is only allowed in ADMIN mode or already login") + } + + userName, ok := cmd.Params["user_name"].(string) + if !ok { + return nil, fmt.Errorf("user_name not provided") + } + + payload := map[string]interface{}{ + "email": userName, + } + + status, ok := cmd.Params["status"].(string) + if ok { + payload["status"] = status + } + + apiURL := "/admin/ingestion/tasks" + + resp, err := c.AdminServerClient.Request("DELETE", apiURL, "admin", nil, payload) + if err != nil { + return nil, fmt.Errorf("failed to remove user %s ingestion tasks: %w", userName, err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to remove user %s ingestion tasks: HTTP %d, body: %s", userName, resp.StatusCode, string(resp.Body)) + } + + var result CommonResponse + if err = json.Unmarshal(resp.Body, &result); err != nil { + return nil, fmt.Errorf("remove user %s ingestion tasks failed: invalid JSON (%w)", userName, err) + } + + if result.Code != 0 { + return nil, fmt.Errorf("%s", result.Message) + } + + result.Duration = resp.Duration + return &result, nil +} diff --git a/internal/cli/admin_parser.go b/internal/cli/admin_parser.go index 107571f765..1ca3969ec3 100644 --- a/internal/cli/admin_parser.go +++ b/internal/cli/admin_parser.go @@ -138,6 +138,8 @@ func (p *Parser) parseAdminListCommand() (*Command, error) { return NewCommand("list_services"), nil case TokenUsers: return p.parseAdminListUsersCommand() + case TokenUser: + return p.parseAdminListUserCommand() case TokenRoles: p.nextToken() // Semicolon is optional for SHOW TOKEN @@ -383,26 +385,31 @@ func (p *Parser) parseAdminListIngestors() (*Command, error) { func (p *Parser) parseAdminStopIngestionTasks() (*Command, error) { p.nextToken() // consume STOP - if p.curToken.Type != TokenIngestion { - return nil, fmt.Errorf("expected INGESTION") + var cmd *Command + + switch p.curToken.Type { + case TokenIngestion: + p.nextToken() + if p.curToken.Type != TokenTasks { + return nil, fmt.Errorf("expected TASKS") + } + p.nextToken() // consume TASK + + taskString, err := p.parseQuotedString() + if err != nil { + return nil, err + } + + tasks := strings.Split(taskString, " ") + p.nextToken() // consume TASK + + cmd = NewCommand("admin_stop_ingestion_tasks") + cmd.Params["tasks"] = tasks + case TokenUser: + return p.parseAdminStopUserCommand() + default: + return nil, fmt.Errorf("expected USER or INGESTION") } - p.nextToken() - - if p.curToken.Type != TokenTasks { - return nil, fmt.Errorf("expected TASKS") - } - p.nextToken() // consume TASK - - taskString, err := p.parseQuotedString() - if err != nil { - return nil, err - } - - tasks := strings.Split(taskString, " ") - p.nextToken() // consume TASK - - cmd := NewCommand("admin_stop_ingestion_tasks") - cmd.Params["tasks"] = tasks // Semicolon is optional for UNSET TOKEN if p.curToken.Type == TokenSemicolon { @@ -1901,6 +1908,8 @@ func (p *Parser) parseAdminRemoveCommand() (*Command, error) { cmd.Params["service_number"] = serviceNum case TokenIngestion: return p.parseAdminRemoveIngestionTasks() + case TokenUser: + return p.parseAdminRemoveUserCommand() default: return nil, fmt.Errorf("expected SERVICE") } @@ -2517,3 +2526,112 @@ commandLoop: } return cmd, nil } + +// LIST USER 'user@example.com' INGESTION TASKS; +func (p *Parser) parseAdminListUserCommand() (*Command, error) { + p.nextToken() // consume USER + + userName, err := p.parseQuotedString() + if err != nil { + return nil, err + } + p.nextToken() + + var cmd *Command + + switch p.curToken.Type { + case TokenIngestion: + p.nextToken() + if p.curToken.Type != TokenTasks { + return nil, fmt.Errorf("expected TASKS after INGESTION") + } + p.nextToken() + cmd = NewCommand("admin_list_user_ingestion_tasks_command") + if p.curToken.Type == TokenQuotedString { + var status string + status, err = p.parseQuotedString() + if err != nil { + return nil, err + } + cmd.Params["status"] = status + p.nextToken() + } + default: + return nil, fmt.Errorf("expected INGESTION after USER") + } + + cmd.Params["user_name"] = userName + return cmd, nil +} + +// STOP USER 'user@example.com' INGESTION TASKS 'created'; +func (p *Parser) parseAdminStopUserCommand() (*Command, error) { + p.nextToken() // consume USER + + userName, err := p.parseQuotedString() + if err != nil { + return nil, err + } + p.nextToken() + + var cmd *Command + switch p.curToken.Type { + case TokenIngestion: + p.nextToken() + if p.curToken.Type != TokenTasks { + return nil, fmt.Errorf("expected TASKS after INGESTION") + } + p.nextToken() + cmd = NewCommand("admin_stop_user_ingestion_tasks_command") + if p.curToken.Type == TokenQuotedString { + var status string + status, err = p.parseQuotedString() + if err != nil { + return nil, err + } + cmd.Params["status"] = status + p.nextToken() + } + default: + return nil, fmt.Errorf("expected INGESTION after USER") + } + + cmd.Params["user_name"] = userName + return cmd, nil +} + +// REMOVE USER 'user@example.com' INGESTION TASKS 'created'; +func (p *Parser) parseAdminRemoveUserCommand() (*Command, error) { + p.nextToken() // consume USER + + userName, err := p.parseQuotedString() + if err != nil { + return nil, err + } + p.nextToken() + + var cmd *Command + switch p.curToken.Type { + case TokenIngestion: + p.nextToken() + if p.curToken.Type != TokenTasks { + return nil, fmt.Errorf("expected TASKS after INGESTION") + } + p.nextToken() + cmd = NewCommand("admin_remove_user_ingestion_tasks_command") + if p.curToken.Type == TokenQuotedString { + var status string + status, err = p.parseQuotedString() + if err != nil { + return nil, err + } + cmd.Params["status"] = status + p.nextToken() + } + default: + return nil, fmt.Errorf("expected INGESTION after USER") + } + + cmd.Params["user_name"] = userName + return cmd, nil +} diff --git a/internal/cli/cli_http.go b/internal/cli/cli_http.go index 57b3e5d3fc..9c07579117 100644 --- a/internal/cli/cli_http.go +++ b/internal/cli/cli_http.go @@ -161,6 +161,12 @@ func (c *CLI) ExecuteAdminCommand(cmd *Command) (ResponseIf, error) { return c.AdminPurgeUserCommand(cmd) case "admin_purge_users_command": return c.AdminPurgeUsersCommand(cmd) + case "admin_list_user_ingestion_tasks_command": + return c.AdminListUserIngestionTasksCommand(cmd) + case "admin_stop_user_ingestion_tasks_command": + return c.AdminStopUserIngestionTasksCommand(cmd) + case "admin_remove_user_ingestion_tasks_command": + return c.AdminRemoveUserIngestionTasksCommand(cmd) // TODO: Implement other commands case "show_admin_server": return c.ShowAdminServer(cmd)