Go: implement dataset ingestion log APIs (#15421)

### What problem does this PR solve?

Part of the Python → Go API server rewrite tracked in #15240 (Dataset
ingestion section). This PR implements the three dataset ingestion
endpoints in the Go API server, mirroring the existing Python
`dataset_api_service` behaviour:

- `GET /api/v1/datasets/<dataset_id>/ingestions/summary`
- `GET /api/v1/datasets/<dataset_id>/ingestions`
- `GET /api/v1/datasets/<dataset_id>/ingestions/<log_id>`

### Type of change

- [x] Refactoring
- [x] New Feature (non-breaking change which adds functionality)

Co-authored-by: sxxtony <sxxtony@users.noreply.github.com>
This commit is contained in:
sxxtony
2026-06-01 06:23:44 +03:00
committed by GitHub
parent 3774916060
commit 12579dbc3d
5 changed files with 481 additions and 10 deletions

View File

@@ -158,3 +158,45 @@ func (dao *DocumentDAO) SumSizeByDatasetID(datasetID string) (int64, error) {
Scan(&total).Error
return total, err
}
// GetParsingStatusByKBID aggregates document parsing status counts for a
// dataset, mirroring DocumentService.get_parsing_status_by_kb_ids in Python.
func (dao *DocumentDAO) GetParsingStatusByKBID(kbID string) (map[string]int64, error) {
result := map[string]int64{
"unstart_count": 0,
"running_count": 0,
"cancel_count": 0,
"done_count": 0,
"fail_count": 0,
}
var rows []struct {
Run *string `gorm:"column:run"`
Cnt int64 `gorm:"column:cnt"`
}
err := DB.Model(&entity.Document{}).
Select("run, COUNT(id) as cnt").
Where("kb_id = ?", kbID).
Group("run").
Scan(&rows).Error
if err != nil {
return nil, err
}
statusFieldMap := map[string]string{
string(entity.TaskStatusUnstart): "unstart_count",
string(entity.TaskStatusRunning): "running_count",
string(entity.TaskStatusCancel): "cancel_count",
string(entity.TaskStatusDone): "done_count",
string(entity.TaskStatusFail): "fail_count",
}
for _, row := range rows {
if row.Run == nil {
continue
}
if field, ok := statusFieldMap[*row.Run]; ok {
result[field] = row.Cnt
}
}
return result, nil
}

View File

@@ -0,0 +1,156 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package dao
import (
"strings"
"ragflow/internal/entity"
)
// graphRaptorFakeDocID is the placeholder document_id used for dataset-level
// (graph/raptor/mindmap) pipeline logs, mirroring GRAPH_RAPTOR_FAKE_DOC_ID in
// api/db/services/task_service.py.
const graphRaptorFakeDocID = "graph_raptor_x"
// pipelineLogOrderableColumns whitelists the columns that may appear in an
// ORDER BY clause so an attacker cannot inject arbitrary SQL through the
// `orderby` query parameter.
var pipelineLogOrderableColumns = map[string]struct{}{
"id": {},
"document_id": {},
"tenant_id": {},
"kb_id": {},
"pipeline_id": {},
"pipeline_title": {},
"parser_id": {},
"document_name": {},
"document_suffix": {},
"document_type": {},
"source_from": {},
"progress": {},
"process_begin_at": {},
"process_duration": {},
"task_type": {},
"operation_status": {},
"status": {},
"create_time": {},
"create_date": {},
"update_time": {},
"update_date": {},
}
func pipelineLogOrderClause(orderby string, desc bool) string {
if _, ok := pipelineLogOrderableColumns[orderby]; !ok {
orderby = "create_time"
}
if desc {
return orderby + " DESC"
}
return orderby + " ASC"
}
// PipelineOperationLogDAO data access object for pipeline_operation_log.
type PipelineOperationLogDAO struct{}
// NewPipelineOperationLogDAO create pipeline operation log DAO.
func NewPipelineOperationLogDAO() *PipelineOperationLogDAO {
return &PipelineOperationLogDAO{}
}
// GetDatasetLogsByKBID lists dataset-level (graph/raptor/mindmap) ingestion
// logs for a knowledge base. Pagination is only applied when both page and
// pageSize are positive, matching peewee's paginate behaviour.
func (dao *PipelineOperationLogDAO) GetDatasetLogsByKBID(kbID string, page, pageSize int, orderby string, desc bool, operationStatus []string, createDateFrom, createDateTo, keywords string) ([]*entity.PipelineOperationLog, int64, error) {
query := DB.Model(&entity.PipelineOperationLog{}).
Where("kb_id = ? AND document_id = ?", kbID, graphRaptorFakeDocID)
if keywords != "" {
query = query.Where("LOWER(document_name) LIKE ?", "%"+strings.ToLower(keywords)+"%")
}
if len(operationStatus) > 0 {
query = query.Where("operation_status IN ?", operationStatus)
}
if createDateFrom != "" {
query = query.Where("create_date >= ?", createDateFrom)
}
if createDateTo != "" {
query = query.Where("create_date <= ?", createDateTo)
}
var count int64
if err := query.Count(&count).Error; err != nil {
return nil, 0, err
}
query = query.Order(pipelineLogOrderClause(orderby, desc))
if page > 0 && pageSize > 0 {
query = query.Offset((page - 1) * pageSize).Limit(pageSize)
}
var logs []*entity.PipelineOperationLog
if err := query.Find(&logs).Error; err != nil {
return nil, 0, err
}
return logs, count, nil
}
// GetFileLogsByKBID lists per-file ingestion logs for a knowledge base.
func (dao *PipelineOperationLogDAO) GetFileLogsByKBID(kbID string, page, pageSize int, orderby string, desc bool, keywords string, operationStatus []string, createDateFrom, createDateTo string) ([]*entity.PipelineOperationLog, int64, error) {
query := DB.Model(&entity.PipelineOperationLog{}).
Where("kb_id = ?", kbID)
if keywords != "" {
query = query.Where("LOWER(document_name) LIKE ?", "%"+strings.ToLower(keywords)+"%")
}
query = query.Where("document_id <> ?", graphRaptorFakeDocID)
if len(operationStatus) > 0 {
query = query.Where("operation_status IN ?", operationStatus)
}
if createDateFrom != "" {
query = query.Where("create_date >= ?", createDateFrom)
}
if createDateTo != "" {
query = query.Where("create_date <= ?", createDateTo)
}
var count int64
if err := query.Count(&count).Error; err != nil {
return nil, 0, err
}
query = query.Order(pipelineLogOrderClause(orderby, desc))
if page > 0 && pageSize > 0 {
query = query.Offset((page - 1) * pageSize).Limit(pageSize)
}
var logs []*entity.PipelineOperationLog
if err := query.Find(&logs).Error; err != nil {
return nil, 0, err
}
return logs, count, nil
}
// GetByIDAndKBID fetches a single ingestion log scoped to its knowledge base.
func (dao *PipelineOperationLogDAO) GetByIDAndKBID(logID, kbID string) (*entity.PipelineOperationLog, error) {
var log entity.PipelineOperationLog
if err := DB.Where("id = ? AND kb_id = ?", logID, kbID).First(&log).Error; err != nil {
return nil, err
}
return &log, nil
}

View File

@@ -171,6 +171,91 @@ func (h *DatasetsHandler) GetDataset(c *gin.Context) {
})
}
// GetIngestionSummary handles GET /api/v1/datasets/:dataset_id/ingestions/summary.
func (h *DatasetsHandler) GetIngestionSummary(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
datasetID := c.Param("dataset_id")
result, code, err := h.datasetsService.GetIngestionSummary(datasetID, user.ID)
if err != nil {
jsonError(c, code, err.Error())
return
}
jsonResponse(c, common.CodeSuccess, result, "success")
}
// ListIngestionLogs handles GET /api/v1/datasets/:dataset_id/ingestions.
func (h *DatasetsHandler) ListIngestionLogs(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
datasetID := c.Param("dataset_id")
page := 0
if pageStr := c.Query("page"); pageStr != "" {
p, err := strconv.Atoi(pageStr)
if err != nil {
jsonError(c, common.CodeArgumentError, "page must be an integer")
return
}
page = p
}
pageSize := 0
if pageSizeStr := c.Query("page_size"); pageSizeStr != "" {
ps, err := strconv.Atoi(pageSizeStr)
if err != nil {
jsonError(c, common.CodeArgumentError, "page_size must be an integer")
return
}
pageSize = ps
}
orderby := c.DefaultQuery("orderby", "create_time")
// desc defaults to true and is only disabled by the literal value "false".
desc := strings.ToLower(c.DefaultQuery("desc", "true")) != "false"
operationStatus := c.QueryArray("operation_status")
createDateFrom := c.Query("create_date_from")
createDateTo := c.Query("create_date_to")
logType := c.DefaultQuery("log_type", "dataset")
keywords := c.Query("keywords")
result, code, err := h.datasetsService.ListIngestionLogs(datasetID, user.ID, page, pageSize, orderby, desc, operationStatus, createDateFrom, createDateTo, logType, keywords)
if err != nil {
jsonError(c, code, err.Error())
return
}
jsonResponse(c, common.CodeSuccess, result, "success")
}
// GetIngestionLog handles GET /api/v1/datasets/:dataset_id/ingestions/:log_id.
func (h *DatasetsHandler) GetIngestionLog(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
datasetID := c.Param("dataset_id")
logID := c.Param("log_id")
result, code, err := h.datasetsService.GetIngestionLog(datasetID, user.ID, logID)
if err != nil {
jsonError(c, code, err.Error())
return
}
jsonResponse(c, common.CodeSuccess, result, "success")
}
// DeleteDatasets handles DELETE /api/v1/datasets.
func (h *DatasetsHandler) DeleteDatasets(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)

View File

@@ -202,6 +202,11 @@ func (r *Router) Setup(engine *gin.Engine) {
datasets.POST("/search", r.chunkHandler.RetrievalTest)
datasets.GET("/metadata/flattened", r.datasetsHandler.ListMetadataFlattened)
// Dataset ingestion logs
datasets.GET("/:dataset_id/ingestions/summary", r.datasetsHandler.GetIngestionSummary)
datasets.GET("/:dataset_id/ingestions", r.datasetsHandler.ListIngestionLogs)
datasets.GET("/:dataset_id/ingestions/:log_id", r.datasetsHandler.GetIngestionLog)
// Dataset documents
datasets.GET("/:dataset_id/documents", r.documentHandler.ListDocuments)

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"ragflow/internal/entity"
"strings"
"time"
"github.com/google/uuid"
"gorm.io/gorm"
@@ -59,21 +60,23 @@ var (
// DatasetService implements the RESTful dataset APIs from dataset_api.py.
type DatasetService struct {
kbDAO *dao.KnowledgebaseDAO
documentDAO *dao.DocumentDAO
connectorDAO *dao.ConnectorDAO
tenantDAO *dao.TenantDAO
tenantLLMDAO *dao.TenantLLMDAO
kbDAO *dao.KnowledgebaseDAO
documentDAO *dao.DocumentDAO
connectorDAO *dao.ConnectorDAO
tenantDAO *dao.TenantDAO
tenantLLMDAO *dao.TenantLLMDAO
pipelineLogDAO *dao.PipelineOperationLogDAO
}
// NewDatasetService creates a new datasets service.
func NewDatasetService() *DatasetService {
return &DatasetService{
kbDAO: dao.NewKnowledgebaseDAO(),
documentDAO: dao.NewDocumentDAO(),
connectorDAO: dao.NewConnectorDAO(),
tenantDAO: dao.NewTenantDAO(),
tenantLLMDAO: dao.NewTenantLLMDAO(),
kbDAO: dao.NewKnowledgebaseDAO(),
documentDAO: dao.NewDocumentDAO(),
connectorDAO: dao.NewConnectorDAO(),
tenantDAO: dao.NewTenantDAO(),
tenantLLMDAO: dao.NewTenantLLMDAO(),
pipelineLogDAO: dao.NewPipelineOperationLogDAO(),
}
}
@@ -564,6 +567,186 @@ func (s *DatasetService) Accessible(kbID, userID string) bool {
return s.kbDAO.Accessible(kbID, userID)
}
// GetIngestionSummary returns dataset-level ingestion counters together with
// the aggregated document parsing status, mirroring
// dataset_api_service.get_ingestion_summary.
func (s *DatasetService) GetIngestionSummary(datasetID, userID string) (map[string]interface{}, common.ErrorCode, error) {
datasetID = strings.TrimSpace(datasetID)
if datasetID == "" {
return nil, common.CodeDataError, errors.New("Lack of \"Dataset ID\"")
}
if !s.kbDAO.Accessible(datasetID, userID) {
return nil, common.CodeDataError, fmt.Errorf("User '%s' lacks permission for dataset '%s'", userID, datasetID)
}
kb, err := s.kbDAO.GetByID(datasetID)
if err != nil || kb == nil {
return nil, common.CodeDataError, errors.New("Invalid Dataset ID")
}
status, err := s.documentDAO.GetParsingStatusByKBID(datasetID)
if err != nil {
return nil, common.CodeServerError, errors.New("Database operation failed")
}
return map[string]interface{}{
"doc_num": kb.DocNum,
"chunk_num": kb.ChunkNum,
"token_num": kb.TokenNum,
"status": status,
}, common.CodeSuccess, nil
}
// ListIngestionLogs lists ingestion logs for a dataset, mirroring
// dataset_api_service.list_ingestion_logs. log_type selects between
// dataset-level logs ("dataset") and per-file logs ("file").
func (s *DatasetService) ListIngestionLogs(datasetID, userID string, page, pageSize int, orderby string, desc bool, operationStatus []string, createDateFrom, createDateTo, logType, keywords string) (map[string]interface{}, common.ErrorCode, error) {
datasetID = strings.TrimSpace(datasetID)
if datasetID == "" {
return nil, common.CodeDataError, errors.New("Lack of \"Dataset ID\"")
}
if !s.kbDAO.Accessible(datasetID, userID) {
return nil, common.CodeDataError, errors.New("No authorization.")
}
if logType != "dataset" && logType != "file" {
return nil, common.CodeDataError, errors.New("Invalid \"log_type\", expected \"dataset\" or \"file\"")
}
var (
logs []*entity.PipelineOperationLog
total int64
err error
)
if logType == "file" {
logs, total, err = s.pipelineLogDAO.GetFileLogsByKBID(datasetID, page, pageSize, orderby, desc, keywords, operationStatus, createDateFrom, createDateTo)
} else {
logs, total, err = s.pipelineLogDAO.GetDatasetLogsByKBID(datasetID, page, pageSize, orderby, desc, operationStatus, createDateFrom, createDateTo, keywords)
}
if err != nil {
return nil, common.CodeServerError, errors.New("Database operation failed")
}
items := make([]map[string]interface{}, 0, len(logs))
for _, log := range logs {
if log == nil {
continue
}
if logType == "file" {
items = append(items, fileIngestionLogToMap(log))
} else {
items = append(items, datasetIngestionLogToMap(log))
}
}
return map[string]interface{}{
"total": total,
"logs": items,
}, common.CodeSuccess, nil
}
// GetIngestionLog returns a single dataset-level ingestion log, mirroring
// dataset_api_service.get_ingestion_log.
func (s *DatasetService) GetIngestionLog(datasetID, userID, logID string) (map[string]interface{}, common.ErrorCode, error) {
datasetID = strings.TrimSpace(datasetID)
if datasetID == "" {
return nil, common.CodeDataError, errors.New("Lack of \"Dataset ID\"")
}
if !s.kbDAO.Accessible(datasetID, userID) {
return nil, common.CodeDataError, errors.New("No authorization.")
}
log, err := s.pipelineLogDAO.GetByIDAndKBID(logID, datasetID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, common.CodeDataError, errors.New("Log not found")
}
return nil, common.CodeServerError, errors.New("Database operation failed")
}
return datasetIngestionLogToMap(log), common.CodeSuccess, nil
}
func datasetIngestionLogToMap(log *entity.PipelineOperationLog) map[string]interface{} {
return map[string]interface{}{
"id": log.ID,
"tenant_id": log.TenantID,
"kb_id": log.KbID,
"progress": log.Progress,
"progress_msg": stringPointerValue(log.ProgressMsg),
"process_begin_at": timePointerValue(log.ProcessBeginAt),
"process_duration": log.ProcessDuration,
"task_type": log.TaskType,
"operation_status": log.OperationStatus,
"avatar": stringPointerValue(log.Avatar),
"status": stringPointerValue(log.Status),
"create_time": int64PointerValue(log.CreateTime),
"create_date": timePointerValue(log.CreateDate),
"update_time": int64PointerValue(log.UpdateTime),
"update_date": timePointerValue(log.UpdateDate),
}
}
func fileIngestionLogToMap(log *entity.PipelineOperationLog) map[string]interface{} {
return map[string]interface{}{
"id": log.ID,
"document_id": log.DocumentID,
"tenant_id": log.TenantID,
"kb_id": log.KbID,
"pipeline_id": stringPointerValue(log.PipelineID),
"pipeline_title": stringPointerValue(log.PipelineTitle),
"parser_id": log.ParserID,
"document_name": log.DocumentName,
"document_suffix": log.DocumentSuffix,
"document_type": log.DocumentType,
"source_from": log.SourceFrom,
"progress": log.Progress,
"progress_msg": stringPointerValue(log.ProgressMsg),
"process_begin_at": timePointerValue(log.ProcessBeginAt),
"process_duration": log.ProcessDuration,
"dsl": jsonMapValue(log.DSL),
"task_type": log.TaskType,
"operation_status": log.OperationStatus,
"avatar": stringPointerValue(log.Avatar),
"status": stringPointerValue(log.Status),
"create_time": int64PointerValue(log.CreateTime),
"create_date": timePointerValue(log.CreateDate),
"update_time": int64PointerValue(log.UpdateTime),
"update_date": timePointerValue(log.UpdateDate),
}
}
func stringPointerValue(s *string) interface{} {
if s == nil {
return nil
}
return *s
}
func int64PointerValue(i *int64) interface{} {
if i == nil {
return nil
}
return *i
}
func timePointerValue(t *time.Time) interface{} {
if t == nil {
return nil
}
return t.Format("2006-01-02 15:04:05")
}
func jsonMapValue(m entity.JSONMap) interface{} {
if m == nil {
return nil
}
return m
}
func (s *DatasetService) deleteDataset(tenantID string, kb *entity.Knowledgebase) error {
return dao.DB.Transaction(func(tx *gorm.DB) error {
var documents []entity.Document