mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
### What problem does this PR solve? ``` RAGFlow(admin)> mq publish 'msg2'; SUCCESS RAGFlow(admin)> mq publish 'msg3'; SUCCESS RAGFlow(admin)> mq list; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | | msg3 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull 2; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull noack; +---------+---------------+ | message | subject | +---------+---------------+ | abc | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq show +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | ack_pending_count | consumer_count | memory | message_count | pending_count | redelivered_count | waiting_count | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | 2 | 1 | 0 | 2 | 0 | 1 | 0 | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ RAGFlow(admin)> list ingestors; +--------------+-------------------------------------------+--------+ | host | name | status | +--------------+-------------------------------------------+--------+ | 192.168.1.38 | ingestor-8f0e4bd5650a4ac58b0151969fbf6935 | alive | +--------------+-------------------------------------------+--------+ RAGFlow(admin)> list ingestion tasks; +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | document_id | id | status | step | user | user_id | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | ffe64fae423411f1a2d938a74640adcc | 90d3d0f6528941c1ac8eb0360effccc4 | COMPLETED | 5 | aaa@aaa.com | 2ba4881420fa11f19e9c38a74640adcc | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ RAGFlow(admin)> remove ingestion tasks '90d3d0f6528941c1ac8eb0360effccc4'; +---------+----------------------------------+ | delete | task_id | +---------+----------------------------------+ | success | 90d3d0f6528941c1ac8eb0360effccc4 | +---------+----------------------------------+ RAGFlow(admin)> stop ingestion tasks 'e89e20d9a25848a1b79bd9345ddbfe1d'; +----------+----------------------------------+ | status | task_id | +----------+----------------------------------+ | STOPPING | e89e20d9a25848a1b79bd9345ddbfe1d | +----------+----------------------------------+ # Publish a message RAGFlow(admin)> mq publish 'cdd'; SUCCESS # List current tasks in the message queue RAGFlow(admin)> mq list +----------------------------------+---------------+ | message | subject | +----------------------------------+---------------+ | 7ce392a3c1624cd2be4b5276e8825059 | tasks.RAGFLOW | +----------------------------------+---------------+ # Consume a task from the message queue RAGFlow(admin)> mq pull +------+-----+----------------+ | ack | id | type | +------+-----+----------------+ | true | cdd | ingestion_test | +------+-----+----------------+ # User mode # List ingestion tasks, followed by dataset id RAGFlow(user)> list ingestion tasks from '0abe79f9423311f1ad8d38a74640adcc'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ RAGFlow(user)> list ingestion tasks; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-06-02T19:02:31+08:00 | 1780398151417 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | e89e20d9a25848a1b79bd9345ddbfe1d | | COMPLETED | 2026-06-02T19:02:52+08:00 | 1780398172208 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Create an ingestion task # First argument is document id, second argument is dataset id RAGFlow(user)> start ingestion 'ffe64fae423411f1a2d938a74640adcc' from '0abe79f9423311f1ad8d38a74640adcc'; +----------------------------------+-------------------------------------------+ | document_id | result | +----------------------------------+-------------------------------------------+ | ffe64fae423411f1a2d938a74640adcc | task_id: 8d758cd14a8b4ba8ab505003fb52017d | +----------------------------------+-------------------------------------------+ # Pause an ingestion task, first argument is ingestion id RAGFlow(user)> stop ingestion '8d758cd14a8b4ba8ab505003fb52017d'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Delete an ingestion task RAGFlow(api/default)> remove ingestion tasks 'f366450a27d54677aec1c7090add30f0'; +---------+----------------------------------+ | remove | task_id | +---------+----------------------------------+ | success | f366450a27d54677aec1c7090add30f0 | +---------+----------------------------------+ ``` ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai <haijin.chn@gmail.com>
150 lines
3.7 KiB
Go
150 lines
3.7 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
package service
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"ragflow/internal/common"
|
|
"ragflow/internal/server"
|
|
"ragflow/internal/utility"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var AdminServiceClient *AdminClient
|
|
|
|
// AdminClient is responsible for sending heartbeat reports to the admin server
|
|
type AdminClient struct {
|
|
client *utility.HTTPClient
|
|
logger *zap.Logger
|
|
serverType common.ServerType
|
|
serverName string
|
|
host string
|
|
port int
|
|
version string
|
|
lastSuccess bool
|
|
attemptCount int
|
|
}
|
|
|
|
// NewAdminClient creates a new heartbeat service instance
|
|
func NewAdminClient(logger *zap.Logger, serverType common.ServerType, serverName, host string, port int) *AdminClient {
|
|
return &AdminClient{
|
|
logger: logger,
|
|
serverType: serverType,
|
|
serverName: serverName,
|
|
host: host,
|
|
port: port,
|
|
version: utility.GetRAGFlowVersion(),
|
|
lastSuccess: false,
|
|
attemptCount: 0,
|
|
}
|
|
}
|
|
|
|
// InitHTTPClient initializes the HTTP client with admin server configuration
|
|
func (h *AdminClient) InitHTTPClient() error {
|
|
adminConfig := server.GetAdminConfig()
|
|
if adminConfig == nil {
|
|
return fmt.Errorf("admin configuration not found")
|
|
}
|
|
|
|
h.client = utility.NewHTTPClientBuilder().
|
|
WithHost(adminConfig.Host).
|
|
WithPort(adminConfig.Port).
|
|
WithTimeout(10 * time.Second).
|
|
Build()
|
|
|
|
h.logger.Info("Heartbeat HTTP client initialized",
|
|
zap.String("admin_host", adminConfig.Host),
|
|
zap.Int("admin_port", adminConfig.Port),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// SendHeartbeat sends a heartbeat message to the admin server
|
|
func (h *AdminClient) SendHeartbeat() error {
|
|
|
|
if h.attemptCount < 10 {
|
|
if h.lastSuccess {
|
|
h.attemptCount++
|
|
return nil
|
|
}
|
|
}
|
|
h.attemptCount = 0
|
|
h.lastSuccess = false
|
|
|
|
if h.client == nil {
|
|
if err := h.InitHTTPClient(); err != nil {
|
|
h.logger.Error("Failed to initialize HTTP client", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
message := &common.BaseMessage{
|
|
MessageID: time.Now().UnixNano(),
|
|
MessageType: common.MessageHeartbeat,
|
|
ServerName: h.serverName,
|
|
ServerType: h.serverType,
|
|
Host: h.host,
|
|
Port: h.port,
|
|
Version: h.version,
|
|
Timestamp: time.Now(),
|
|
Ext: nil,
|
|
}
|
|
|
|
jsonData, err := json.Marshal(message)
|
|
if err != nil {
|
|
h.logger.Error("Failed to marshal heartbeat message", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
resp, err := h.client.PostJSON("/api/v1/admin/reports", jsonData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != 200 {
|
|
// extract the Code and Message field of the response
|
|
var responseBody map[string]interface{}
|
|
err = json.NewDecoder(resp.Body).Decode(&responseBody)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
code, ok := responseBody["code"].(float64)
|
|
if !ok {
|
|
return fmt.Errorf("unexpected heartbeat response (status %d): missing or non-numeric \"code\" field", resp.StatusCode)
|
|
}
|
|
responseCode := common.ErrorCode(code)
|
|
if responseCode != common.CodeLicenseValid {
|
|
return errors.New(responseCode.Message())
|
|
}
|
|
}
|
|
|
|
h.logger.Debug("Heartbeat sent successfully",
|
|
zap.String("server_id", h.serverName),
|
|
zap.String("server_type", string(h.serverType)),
|
|
)
|
|
|
|
h.lastSuccess = true
|
|
|
|
return nil
|
|
}
|