Go: Admin list ingestion tasks (#14695)

### What problem does this PR solve?

```
RAGFlow(admin)> list tasks;
+-------------+------------------+----------------------------------+-------------+-----------+----------------------------------+----------+----------------------+-------------+-----------+---------+
| chunk_count | digest           | document_id                      | duration    | from_page | id                               | priority | progress             | retry_count | task_type | to_page |
+-------------+------------------+----------------------------------+-------------+-----------+----------------------------------+----------+----------------------+-------------+-----------+---------+
| 16          | 8a0016a0dc3cbdbb | f6aa38bb4ad111f1ba6338a74640adcc | 1511.156966 | 0         | f91e4f104ad111f1aaaf38a74640adcc | 0        | 1                    | 1           |           | 12      |
+-------------+------------------+----------------------------------+-------------+-----------+----------------------------------+----------+----------------------+-------------+-----------+---------+
```

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai
2026-05-09 10:03:23 +08:00
committed by GitHub
parent 5e96c5cae6
commit b6abce50b1
9 changed files with 90 additions and 0 deletions

View File

@@ -208,6 +208,15 @@ func (h *Handler) AuthCheck(c *gin.Context) {
successNoData(c, "Admin is authorized")
}
// ListTasks handle list tasks
func (h *Handler) ListTasks(c *gin.Context) {
tasks, err := h.service.ListTasks()
if err != nil {
errorResponse(c, err.Error(), 500)
}
success(c, tasks, "Get all tasks")
}
// ListUsers handle list users
func (h *Handler) ListUsers(c *gin.Context) {
users, err := h.service.ListUsers()

View File

@@ -55,6 +55,9 @@ func (r *Router) Setup(engine *gin.Engine) {
// Auth
protected.GET("/auth", r.handler.AuthCheck)
// Tasks
protected.GET("/tasks", r.handler.ListTasks)
// User management
protected.GET("/users", r.handler.ListUsers)
protected.POST("/users", r.handler.CreateUser)

View File

@@ -34,6 +34,7 @@ import (
"ragflow/internal/utility"
"regexp"
"strconv"
"strings"
"time"
"go.uber.org/zap"
@@ -100,6 +101,37 @@ func (s *Service) Logout(user interface{}) error {
return nil
}
// ListTasks
func (s *Service) ListTasks() ([]map[string]interface{}, error) {
tasks, err := s.taskDAO.GetAllTasks()
if err != nil {
return nil, err
}
var result []map[string]interface{}
for _, task := range tasks {
// task.ChunkIDs is a string, delimiter is space, count the word count
ChunkCount := strings.Count(*task.ChunkIDs, " ")
result = append(result, map[string]interface{}{
"id": task.ID,
"task_type": task.TaskType,
"document_id": task.DocID,
"chunk_count": ChunkCount,
"from_page": task.FromPage,
"to_page": task.ToPage,
"priority": task.Priority,
"duration": task.ProcessDuration,
"progress": task.Progress,
//"message": *task.ProgressMsg,
"retry_count": task.RetryCount,
"digest": task.Digest,
})
}
return result, nil
}
// GetUserByToken get user by access token
func (s *Service) GetUserByToken(token string) (*entity.User, error) {
user, err := s.userDAO.GetByAccessToken(token)

View File

@@ -1118,3 +1118,30 @@ func (c *RAGFlowClient) DropAdminToken(cmd *Command) (ResponseIf, error) {
result.Duration = resp.Duration
return &result, nil
}
func (c *RAGFlowClient) ListAdminTasks(cmd *Command) (ResponseIf, error) {
if c.ServerType != "admin" {
return nil, fmt.Errorf("this command is only allowed in ADMIN mode")
}
resp, err := c.HTTPClient.Request("GET", "/admin/tasks", "admin", nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to drop token: %w", err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed to drop token: HTTP %d, body: %s", resp.StatusCode, string(resp.Body))
}
var result CommonResponse
if err = json.Unmarshal(resp.Body, &result); err != nil {
return nil, fmt.Errorf("drop token failed: invalid JSON (%w)", err)
}
if result.Code != 0 {
return nil, fmt.Errorf("%s", result.Message)
}
result.Duration = resp.Duration
return &result, nil
}

View File

@@ -190,6 +190,8 @@ func (p *Parser) parseAdminListCommand() (*Command, error) {
return NewCommand("list_user_chats"), nil
case TokenFiles:
return p.parseAdminListFiles()
case TokenTasks:
return p.parseAdminListTasks()
default:
return nil, fmt.Errorf("unknown LIST target: %s", p.curToken.Value)
}
@@ -368,6 +370,12 @@ func (p *Parser) parseAdminListFiles() (*Command, error) {
return cmd, nil
}
func (p *Parser) parseAdminListTasks() (*Command, error) {
p.nextToken() // consume TASKS
cmd := NewCommand("list_admin_tasks")
return cmd, nil
}
func (p *Parser) parseAdminShowCommand() (*Command, error) {
p.nextToken() // consume SHOW

View File

@@ -177,6 +177,8 @@ func (c *RAGFlowClient) ExecuteAdminCommand(cmd *Command) (ResponseIf, error) {
return c.ListInstanceModels(cmd)
case "show_model":
return c.ShowModel(cmd)
case "list_admin_tasks":
return c.ListAdminTasks(cmd)
// TODO: Implement other commands
default:
return nil, fmt.Errorf("command '%s' would be executed with API", cmd.Type)

View File

@@ -427,6 +427,8 @@ func (l *Lexer) lookupIdent(ident string) Token {
return Token{Type: TokenRegion, Value: ident}
case "URL":
return Token{Type: TokenURL, Value: ident}
case "TASKS":
return Token{Type: TokenTasks, Value: ident}
case "LOG":
return Token{Type: TokenLog, Value: ident}
case "LEVEL":

View File

@@ -143,6 +143,7 @@ const (
TokenTag
TokenRegion
TokenURL
TokenTasks
TokenLog
TokenLevel
TokenDebug

View File

@@ -57,3 +57,9 @@ func (dao *TaskDAO) DeleteByTenantID(tenantID string) (int64, error) {
result := DB.Unscoped().Where("doc_id IN (SELECT id FROM document WHERE tenant_id = ?)", tenantID).Delete(&entity.Task{})
return result.RowsAffected, result.Error
}
func (dao *TaskDAO) GetAllTasks() ([]*entity.Task, error) {
var tasks []*entity.Task
err := DB.Find(&tasks).Error
return tasks, err
}