From b6abce50b1cb01403ad6a297ce8fda8c021e84b8 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Sat, 9 May 2026 10:03:23 +0800 Subject: [PATCH] Go: Admin list ingestion tasks (#14695) ### What problem does this PR solve? ``` RAGFlow(admin)> list tasks; +-------------+------------------+----------------------------------+-------------+-----------+----------------------------------+----------+----------------------+-------------+-----------+---------+ | chunk_count | digest | document_id | duration | from_page | id | priority | progress | retry_count | task_type | to_page | +-------------+------------------+----------------------------------+-------------+-----------+----------------------------------+----------+----------------------+-------------+-----------+---------+ | 16 | 8a0016a0dc3cbdbb | f6aa38bb4ad111f1ba6338a74640adcc | 1511.156966 | 0 | f91e4f104ad111f1aaaf38a74640adcc | 0 | 1 | 1 | | 12 | +-------------+------------------+----------------------------------+-------------+-----------+----------------------------------+----------+----------------------+-------------+-----------+---------+ ``` ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai --- internal/admin/handler.go | 9 +++++++++ internal/admin/router.go | 3 +++ internal/admin/service.go | 32 ++++++++++++++++++++++++++++++++ internal/cli/admin_command.go | 27 +++++++++++++++++++++++++++ internal/cli/admin_parser.go | 8 ++++++++ internal/cli/client.go | 2 ++ internal/cli/lexer.go | 2 ++ internal/cli/types.go | 1 + internal/dao/task.go | 6 ++++++ 9 files changed, 90 insertions(+) diff --git a/internal/admin/handler.go b/internal/admin/handler.go index ee823d5dfe..b267baf5be 100644 --- a/internal/admin/handler.go +++ b/internal/admin/handler.go @@ -208,6 +208,15 @@ func (h *Handler) AuthCheck(c *gin.Context) { successNoData(c, "Admin is authorized") } +// ListTasks handle list tasks +func (h *Handler) ListTasks(c *gin.Context) { + tasks, err := h.service.ListTasks() + if err != nil { + errorResponse(c, err.Error(), 500) + } + success(c, tasks, "Get all tasks") +} + // ListUsers handle list users func (h *Handler) ListUsers(c *gin.Context) { users, err := h.service.ListUsers() diff --git a/internal/admin/router.go b/internal/admin/router.go index fe3e54d22a..03aa3300b6 100644 --- a/internal/admin/router.go +++ b/internal/admin/router.go @@ -55,6 +55,9 @@ func (r *Router) Setup(engine *gin.Engine) { // Auth protected.GET("/auth", r.handler.AuthCheck) + // Tasks + protected.GET("/tasks", r.handler.ListTasks) + // User management protected.GET("/users", r.handler.ListUsers) protected.POST("/users", r.handler.CreateUser) diff --git a/internal/admin/service.go b/internal/admin/service.go index acd411f259..2b6e282eff 100644 --- a/internal/admin/service.go +++ b/internal/admin/service.go @@ -34,6 +34,7 @@ import ( "ragflow/internal/utility" "regexp" "strconv" + "strings" "time" "go.uber.org/zap" @@ -100,6 +101,37 @@ func (s *Service) Logout(user interface{}) error { return nil } +// ListTasks +func (s *Service) ListTasks() ([]map[string]interface{}, error) { + + tasks, err := s.taskDAO.GetAllTasks() + if err != nil { + return nil, err + } + + var result []map[string]interface{} + for _, task := range tasks { + // task.ChunkIDs is a string, delimiter is space, count the word count + ChunkCount := strings.Count(*task.ChunkIDs, " ") + result = append(result, map[string]interface{}{ + "id": task.ID, + "task_type": task.TaskType, + "document_id": task.DocID, + "chunk_count": ChunkCount, + "from_page": task.FromPage, + "to_page": task.ToPage, + "priority": task.Priority, + "duration": task.ProcessDuration, + "progress": task.Progress, + //"message": *task.ProgressMsg, + "retry_count": task.RetryCount, + "digest": task.Digest, + }) + } + + return result, nil +} + // GetUserByToken get user by access token func (s *Service) GetUserByToken(token string) (*entity.User, error) { user, err := s.userDAO.GetByAccessToken(token) diff --git a/internal/cli/admin_command.go b/internal/cli/admin_command.go index 4b7afe52a8..f6ab603af5 100644 --- a/internal/cli/admin_command.go +++ b/internal/cli/admin_command.go @@ -1118,3 +1118,30 @@ func (c *RAGFlowClient) DropAdminToken(cmd *Command) (ResponseIf, error) { result.Duration = resp.Duration return &result, nil } + +func (c *RAGFlowClient) ListAdminTasks(cmd *Command) (ResponseIf, error) { + if c.ServerType != "admin" { + return nil, fmt.Errorf("this command is only allowed in ADMIN mode") + } + + resp, err := c.HTTPClient.Request("GET", "/admin/tasks", "admin", nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to drop token: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to drop token: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + } + + var result CommonResponse + if err = json.Unmarshal(resp.Body, &result); err != nil { + return nil, fmt.Errorf("drop token failed: invalid JSON (%w)", err) + } + + if result.Code != 0 { + return nil, fmt.Errorf("%s", result.Message) + } + + result.Duration = resp.Duration + return &result, nil +} diff --git a/internal/cli/admin_parser.go b/internal/cli/admin_parser.go index ef0394b189..c1b2edab5a 100644 --- a/internal/cli/admin_parser.go +++ b/internal/cli/admin_parser.go @@ -190,6 +190,8 @@ func (p *Parser) parseAdminListCommand() (*Command, error) { return NewCommand("list_user_chats"), nil case TokenFiles: return p.parseAdminListFiles() + case TokenTasks: + return p.parseAdminListTasks() default: return nil, fmt.Errorf("unknown LIST target: %s", p.curToken.Value) } @@ -368,6 +370,12 @@ func (p *Parser) parseAdminListFiles() (*Command, error) { return cmd, nil } +func (p *Parser) parseAdminListTasks() (*Command, error) { + p.nextToken() // consume TASKS + cmd := NewCommand("list_admin_tasks") + return cmd, nil +} + func (p *Parser) parseAdminShowCommand() (*Command, error) { p.nextToken() // consume SHOW diff --git a/internal/cli/client.go b/internal/cli/client.go index e71e2fd6a0..2a0a013799 100644 --- a/internal/cli/client.go +++ b/internal/cli/client.go @@ -177,6 +177,8 @@ func (c *RAGFlowClient) ExecuteAdminCommand(cmd *Command) (ResponseIf, error) { return c.ListInstanceModels(cmd) case "show_model": return c.ShowModel(cmd) + case "list_admin_tasks": + return c.ListAdminTasks(cmd) // TODO: Implement other commands default: return nil, fmt.Errorf("command '%s' would be executed with API", cmd.Type) diff --git a/internal/cli/lexer.go b/internal/cli/lexer.go index 11b4b8c013..59c23646ee 100644 --- a/internal/cli/lexer.go +++ b/internal/cli/lexer.go @@ -427,6 +427,8 @@ func (l *Lexer) lookupIdent(ident string) Token { return Token{Type: TokenRegion, Value: ident} case "URL": return Token{Type: TokenURL, Value: ident} + case "TASKS": + return Token{Type: TokenTasks, Value: ident} case "LOG": return Token{Type: TokenLog, Value: ident} case "LEVEL": diff --git a/internal/cli/types.go b/internal/cli/types.go index 25490797d9..9a373df87a 100644 --- a/internal/cli/types.go +++ b/internal/cli/types.go @@ -143,6 +143,7 @@ const ( TokenTag TokenRegion TokenURL + TokenTasks TokenLog TokenLevel TokenDebug diff --git a/internal/dao/task.go b/internal/dao/task.go index 1e879bffc7..30bb3fbbea 100644 --- a/internal/dao/task.go +++ b/internal/dao/task.go @@ -57,3 +57,9 @@ func (dao *TaskDAO) DeleteByTenantID(tenantID string) (int64, error) { result := DB.Unscoped().Where("doc_id IN (SELECT id FROM document WHERE tenant_id = ?)", tenantID).Delete(&entity.Task{}) return result.RowsAffected, result.Error } + +func (dao *TaskDAO) GetAllTasks() ([]*entity.Task, error) { + var tasks []*entity.Task + err := DB.Find(&tasks).Error + return tasks, err +}