Go: add ingestion server (#15094)

### What problem does this PR solve?

1. Go ingestion server will connected with admin server with gRPC stream
2. Go ingestion server will be responsible for ingestion tasks
```

RAGFlow(admin)> list ingestors;
+-----------------+-----------+----------------------------------+---------------------------+----------+------------+--------------+--------+------------+---------------+
| address         | cpu_usage | id                               | last_heartbeat            | name     | process_id | rss_usage    | status | task_count | vms_usage     |
+-----------------+-----------+----------------------------------+---------------------------+----------+------------+--------------+--------+------------+---------------+
| 127.0.0.1:58564 | 0         | bdd1870eea2646e0aacb8a2cd3307aa2 | 2026-05-24T18:16:17+08:00 | ingestor | 680152     | 212.72265625 | active | 0          | 2589.12109375 |
+-----------------+-----------+----------------------------------+---------------------------+----------+------------+--------------+--------+------------+---------------+

RAGFlow(admin)> start ingestion 'abc';
+----------------------------------+
| task_id                          |
+----------------------------------+
| e714777639ca4760ab427b5f211e81ad |
+----------------------------------+

RAGFlow(admin)> stop ingestion 'f7bd39d0a724457eb5fdce6d81699776';
+----------------------------------+
| task_id                          |
+----------------------------------+
| f7bd39d0a724457eb5fdce6d81699776 |
+----------------------------------+

RAGFlow(admin)> list tasks;
+-----+----------------------------------+-------+------+----------------------------------+---------------------------+------------+------------+
| ETA | assign_to                        | error | from | id                               | last_update               | start_time | status     |
+-----+----------------------------------+-------+------+----------------------------------+---------------------------+------------+------------+
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | eae6431da72a40e796cff3a03008091b | 2026-05-24T19:46:03+08:00 |            | COMPLETED  |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | 6cccdd174bd049ecb05a774bbb47593f | 2026-05-24T19:46:03+08:00 |            | COMPLETED  |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | ef360d777e57485799adb96b30f2b4b8 | 2026-05-24T19:46:03+08:00 |            | CANCELED   |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | bcc5c5448cb64de48b6b6171c36fb790 | 2026-05-24T19:46:03+08:00 |            | CANCELED   |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | bfc25384c43a443294fe2da979a38ac2 | 2026-05-24T19:46:03+08:00 |            | DISPATCHED |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | 84960537b85d413b8990a9efd5952d67 | 2026-05-24T19:46:04+08:00 |            | DISPATCHED |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | 3d223c1b51e24b36861a3bfb2f1d58d4 | 2026-05-24T19:46:03+08:00 |            | CANCELED   |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | e433b0e356b846c89c301621a3c54494 | 2026-05-24T19:46:03+08:00 |            | COMPLETED  |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | 7c93a3880f074ebd8eca14e6b51bb7ef | 2026-05-24T19:46:03+08:00 |            | COMPLETED  |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | df2e4ef51aaf4390bff9a23f2692486e | 2026-05-24T19:46:04+08:00 |            | DISPATCHED |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | 7377c53010194ef7a83aa206698d66ff | 2026-05-24T19:46:05+08:00 |            | DISPATCHED |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | df64d1a1f9d348e3a2f174c4d7d69e73 | 2026-05-24T19:46:05+08:00 |            | DISPATCHED |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | b59834512e2847e1bdf13ace04b8a456 | 2026-05-24T19:46:06+08:00 |            | DISPATCHED |
| 0   | 17937da188b84f23a5c10bb87588944b |       | CLI  | 0064bb0ab69344028d1ecfda053826f4 | 2026-05-24T19:46:03+08:00 |            | QUEUED     |
+-----+----------------------------------+-------+------+----------------------------------+---------------------------+------------+------------+


```


### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai
2026-05-25 14:00:08 +08:00
committed by GitHub
parent 5d022d83e8
commit f8c626bbc8
36 changed files with 3447 additions and 185 deletions

View File

@@ -192,6 +192,10 @@ func (p *Parser) parseAdminListCommand() (*Command, error) {
return p.parseAdminListFiles()
case TokenTasks:
return p.parseAdminListTasks()
case TokenIngestors:
return p.parseAdminListIngestors()
case TokenIngestion:
return p.parseAdminListIngestionTasks()
default:
return nil, fmt.Errorf("unknown LIST target: %s", p.curToken.Value)
}
@@ -376,6 +380,24 @@ func (p *Parser) parseAdminListTasks() (*Command, error) {
return cmd, nil
}
func (p *Parser) parseAdminListIngestors() (*Command, error) {
p.nextToken() // consume TASKS
cmd := NewCommand("admin_list_ingestors")
return cmd, nil
}
func (p *Parser) parseAdminListIngestionTasks() (*Command, error) {
p.nextToken() // consume Ingestion
if p.curToken.Type != TokenTasks {
return nil, fmt.Errorf("expected TASKS")
}
cmd := NewCommand("list_admin_ingestion_tasks")
return cmd, nil
}
func (p *Parser) parseAdminShowCommand() (*Command, error) {
p.nextToken() // consume SHOW
@@ -1543,10 +1565,19 @@ func (p *Parser) parseAdminStartupCommand() (*Command, error) {
func (p *Parser) parseAdminShutdownCommand() (*Command, error) {
p.nextToken() // consume SHUTDOWN
if p.curToken.Type != TokenService {
return nil, fmt.Errorf("expected SERVICE")
switch p.curToken.Type {
case TokenService:
return p.parseAdminShutdownServiceCommand()
case TokenIngestor:
return p.parseAdminShutdownIngestorCommand()
default:
return nil, fmt.Errorf("expected SERVICE or INGESTOR")
}
p.nextToken()
}
func (p *Parser) parseAdminShutdownServiceCommand() (*Command, error) {
p.nextToken() // consume SERVICE
serviceNum, err := p.parseNumber()
if err != nil {
@@ -1564,6 +1595,25 @@ func (p *Parser) parseAdminShutdownCommand() (*Command, error) {
return cmd, nil
}
func (p *Parser) parseAdminShutdownIngestorCommand() (*Command, error) {
p.nextToken() // consume INGESTOR
ingestorName, err := p.parseQuotedString()
if err != nil {
return nil, err
}
cmd := NewCommand("admin_shutdown_ingestor_command")
cmd.Params["ingestor_name"] = ingestorName
p.nextToken()
// Semicolon is optional for UNSET TOKEN
if p.curToken.Type == TokenSemicolon {
p.nextToken()
}
return cmd, nil
}
func (p *Parser) parseAdminRestartCommand() (*Command, error) {
p.nextToken() // consume RESTART
if p.curToken.Type != TokenService {
@@ -1587,6 +1637,73 @@ func (p *Parser) parseAdminRestartCommand() (*Command, error) {
return cmd, nil
}
func (p *Parser) parseStartIngestion() (*Command, error) {
p.nextToken() // consume Start
if p.curToken.Type != TokenIngestion {
return nil, fmt.Errorf("expect INGESTION")
}
p.nextToken() // consume Ingest
uri, err := p.parseQuotedString()
if err != nil {
return nil, err
}
cmd := NewCommand("admin_start_ingestion_command")
cmd.Params["uri"] = uri
p.nextToken()
// Semicolon is optional for UNSET TOKEN
if p.curToken.Type == TokenSemicolon {
p.nextToken()
}
return cmd, nil
}
func (p *Parser) parseStopIngestion() (*Command, error) {
p.nextToken() // consume Stop
if p.curToken.Type != TokenIngestion {
return nil, fmt.Errorf("expect INGESTION")
}
p.nextToken() // consume Ingest
taskID, err := p.parseQuotedString()
if err != nil {
return nil, err
}
cmd := NewCommand("admin_stop_ingestion_command")
cmd.Params["task_id"] = taskID
p.nextToken()
// Semicolon is optional for UNSET TOKEN
if p.curToken.Type == TokenSemicolon {
p.nextToken()
}
return cmd, nil
}
func (p *Parser) parseAdminIngestCommand() (*Command, error) {
p.nextToken() // consume Ingest
uri, err := p.parseQuotedString()
if err != nil {
return nil, err
}
cmd := NewCommand("admin_ingest_command")
cmd.Params["uri"] = uri
p.nextToken()
// Semicolon is optional for UNSET TOKEN
if p.curToken.Type == TokenSemicolon {
p.nextToken()
}
return cmd, nil
}
func (p *Parser) parseAdminUnsetCommand() (*Command, error) {
p.nextToken() // consume UNSET