diff --git a/internal/dao/document.go b/internal/dao/document.go index 2fa2fa5b0a..b7a6560565 100644 --- a/internal/dao/document.go +++ b/internal/dao/document.go @@ -158,3 +158,45 @@ func (dao *DocumentDAO) SumSizeByDatasetID(datasetID string) (int64, error) { Scan(&total).Error return total, err } + +// GetParsingStatusByKBID aggregates document parsing status counts for a +// dataset, mirroring DocumentService.get_parsing_status_by_kb_ids in Python. +func (dao *DocumentDAO) GetParsingStatusByKBID(kbID string) (map[string]int64, error) { + result := map[string]int64{ + "unstart_count": 0, + "running_count": 0, + "cancel_count": 0, + "done_count": 0, + "fail_count": 0, + } + + var rows []struct { + Run *string `gorm:"column:run"` + Cnt int64 `gorm:"column:cnt"` + } + err := DB.Model(&entity.Document{}). + Select("run, COUNT(id) as cnt"). + Where("kb_id = ?", kbID). + Group("run"). + Scan(&rows).Error + if err != nil { + return nil, err + } + + statusFieldMap := map[string]string{ + string(entity.TaskStatusUnstart): "unstart_count", + string(entity.TaskStatusRunning): "running_count", + string(entity.TaskStatusCancel): "cancel_count", + string(entity.TaskStatusDone): "done_count", + string(entity.TaskStatusFail): "fail_count", + } + for _, row := range rows { + if row.Run == nil { + continue + } + if field, ok := statusFieldMap[*row.Run]; ok { + result[field] = row.Cnt + } + } + return result, nil +} diff --git a/internal/dao/pipeline_operation_log.go b/internal/dao/pipeline_operation_log.go new file mode 100644 index 0000000000..68ab62d945 --- /dev/null +++ b/internal/dao/pipeline_operation_log.go @@ -0,0 +1,156 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package dao + +import ( + "strings" + + "ragflow/internal/entity" +) + +// graphRaptorFakeDocID is the placeholder document_id used for dataset-level +// (graph/raptor/mindmap) pipeline logs, mirroring GRAPH_RAPTOR_FAKE_DOC_ID in +// api/db/services/task_service.py. +const graphRaptorFakeDocID = "graph_raptor_x" + +// pipelineLogOrderableColumns whitelists the columns that may appear in an +// ORDER BY clause so an attacker cannot inject arbitrary SQL through the +// `orderby` query parameter. +var pipelineLogOrderableColumns = map[string]struct{}{ + "id": {}, + "document_id": {}, + "tenant_id": {}, + "kb_id": {}, + "pipeline_id": {}, + "pipeline_title": {}, + "parser_id": {}, + "document_name": {}, + "document_suffix": {}, + "document_type": {}, + "source_from": {}, + "progress": {}, + "process_begin_at": {}, + "process_duration": {}, + "task_type": {}, + "operation_status": {}, + "status": {}, + "create_time": {}, + "create_date": {}, + "update_time": {}, + "update_date": {}, +} + +func pipelineLogOrderClause(orderby string, desc bool) string { + if _, ok := pipelineLogOrderableColumns[orderby]; !ok { + orderby = "create_time" + } + if desc { + return orderby + " DESC" + } + return orderby + " ASC" +} + +// PipelineOperationLogDAO data access object for pipeline_operation_log. +type PipelineOperationLogDAO struct{} + +// NewPipelineOperationLogDAO create pipeline operation log DAO. +func NewPipelineOperationLogDAO() *PipelineOperationLogDAO { + return &PipelineOperationLogDAO{} +} + +// GetDatasetLogsByKBID lists dataset-level (graph/raptor/mindmap) ingestion +// logs for a knowledge base. Pagination is only applied when both page and +// pageSize are positive, matching peewee's paginate behaviour. +func (dao *PipelineOperationLogDAO) GetDatasetLogsByKBID(kbID string, page, pageSize int, orderby string, desc bool, operationStatus []string, createDateFrom, createDateTo, keywords string) ([]*entity.PipelineOperationLog, int64, error) { + query := DB.Model(&entity.PipelineOperationLog{}). + Where("kb_id = ? AND document_id = ?", kbID, graphRaptorFakeDocID) + + if keywords != "" { + query = query.Where("LOWER(document_name) LIKE ?", "%"+strings.ToLower(keywords)+"%") + } + if len(operationStatus) > 0 { + query = query.Where("operation_status IN ?", operationStatus) + } + if createDateFrom != "" { + query = query.Where("create_date >= ?", createDateFrom) + } + if createDateTo != "" { + query = query.Where("create_date <= ?", createDateTo) + } + + var count int64 + if err := query.Count(&count).Error; err != nil { + return nil, 0, err + } + + query = query.Order(pipelineLogOrderClause(orderby, desc)) + if page > 0 && pageSize > 0 { + query = query.Offset((page - 1) * pageSize).Limit(pageSize) + } + + var logs []*entity.PipelineOperationLog + if err := query.Find(&logs).Error; err != nil { + return nil, 0, err + } + return logs, count, nil +} + +// GetFileLogsByKBID lists per-file ingestion logs for a knowledge base. +func (dao *PipelineOperationLogDAO) GetFileLogsByKBID(kbID string, page, pageSize int, orderby string, desc bool, keywords string, operationStatus []string, createDateFrom, createDateTo string) ([]*entity.PipelineOperationLog, int64, error) { + query := DB.Model(&entity.PipelineOperationLog{}). + Where("kb_id = ?", kbID) + + if keywords != "" { + query = query.Where("LOWER(document_name) LIKE ?", "%"+strings.ToLower(keywords)+"%") + } + query = query.Where("document_id <> ?", graphRaptorFakeDocID) + + if len(operationStatus) > 0 { + query = query.Where("operation_status IN ?", operationStatus) + } + if createDateFrom != "" { + query = query.Where("create_date >= ?", createDateFrom) + } + if createDateTo != "" { + query = query.Where("create_date <= ?", createDateTo) + } + + var count int64 + if err := query.Count(&count).Error; err != nil { + return nil, 0, err + } + + query = query.Order(pipelineLogOrderClause(orderby, desc)) + if page > 0 && pageSize > 0 { + query = query.Offset((page - 1) * pageSize).Limit(pageSize) + } + + var logs []*entity.PipelineOperationLog + if err := query.Find(&logs).Error; err != nil { + return nil, 0, err + } + return logs, count, nil +} + +// GetByIDAndKBID fetches a single ingestion log scoped to its knowledge base. +func (dao *PipelineOperationLogDAO) GetByIDAndKBID(logID, kbID string) (*entity.PipelineOperationLog, error) { + var log entity.PipelineOperationLog + if err := DB.Where("id = ? AND kb_id = ?", logID, kbID).First(&log).Error; err != nil { + return nil, err + } + return &log, nil +} diff --git a/internal/handler/datasets.go b/internal/handler/datasets.go index 4efc8df124..19ccc194bc 100644 --- a/internal/handler/datasets.go +++ b/internal/handler/datasets.go @@ -171,6 +171,91 @@ func (h *DatasetsHandler) GetDataset(c *gin.Context) { }) } +// GetIngestionSummary handles GET /api/v1/datasets/:dataset_id/ingestions/summary. +func (h *DatasetsHandler) GetIngestionSummary(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + datasetID := c.Param("dataset_id") + result, code, err := h.datasetsService.GetIngestionSummary(datasetID, user.ID) + if err != nil { + jsonError(c, code, err.Error()) + return + } + + jsonResponse(c, common.CodeSuccess, result, "success") +} + +// ListIngestionLogs handles GET /api/v1/datasets/:dataset_id/ingestions. +func (h *DatasetsHandler) ListIngestionLogs(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + datasetID := c.Param("dataset_id") + + page := 0 + if pageStr := c.Query("page"); pageStr != "" { + p, err := strconv.Atoi(pageStr) + if err != nil { + jsonError(c, common.CodeArgumentError, "page must be an integer") + return + } + page = p + } + + pageSize := 0 + if pageSizeStr := c.Query("page_size"); pageSizeStr != "" { + ps, err := strconv.Atoi(pageSizeStr) + if err != nil { + jsonError(c, common.CodeArgumentError, "page_size must be an integer") + return + } + pageSize = ps + } + + orderby := c.DefaultQuery("orderby", "create_time") + // desc defaults to true and is only disabled by the literal value "false". + desc := strings.ToLower(c.DefaultQuery("desc", "true")) != "false" + operationStatus := c.QueryArray("operation_status") + createDateFrom := c.Query("create_date_from") + createDateTo := c.Query("create_date_to") + logType := c.DefaultQuery("log_type", "dataset") + keywords := c.Query("keywords") + + result, code, err := h.datasetsService.ListIngestionLogs(datasetID, user.ID, page, pageSize, orderby, desc, operationStatus, createDateFrom, createDateTo, logType, keywords) + if err != nil { + jsonError(c, code, err.Error()) + return + } + + jsonResponse(c, common.CodeSuccess, result, "success") +} + +// GetIngestionLog handles GET /api/v1/datasets/:dataset_id/ingestions/:log_id. +func (h *DatasetsHandler) GetIngestionLog(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + datasetID := c.Param("dataset_id") + logID := c.Param("log_id") + result, code, err := h.datasetsService.GetIngestionLog(datasetID, user.ID, logID) + if err != nil { + jsonError(c, code, err.Error()) + return + } + + jsonResponse(c, common.CodeSuccess, result, "success") +} + // DeleteDatasets handles DELETE /api/v1/datasets. func (h *DatasetsHandler) DeleteDatasets(c *gin.Context) { user, errorCode, errorMessage := GetUser(c) diff --git a/internal/router/router.go b/internal/router/router.go index 86eec9ba54..93615d501e 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -202,6 +202,11 @@ func (r *Router) Setup(engine *gin.Engine) { datasets.POST("/search", r.chunkHandler.RetrievalTest) datasets.GET("/metadata/flattened", r.datasetsHandler.ListMetadataFlattened) + // Dataset ingestion logs + datasets.GET("/:dataset_id/ingestions/summary", r.datasetsHandler.GetIngestionSummary) + datasets.GET("/:dataset_id/ingestions", r.datasetsHandler.ListIngestionLogs) + datasets.GET("/:dataset_id/ingestions/:log_id", r.datasetsHandler.GetIngestionLog) + // Dataset documents datasets.GET("/:dataset_id/documents", r.documentHandler.ListDocuments) diff --git a/internal/service/dataset.go b/internal/service/dataset.go index 19be742525..99fb5b3e6f 100644 --- a/internal/service/dataset.go +++ b/internal/service/dataset.go @@ -22,6 +22,7 @@ import ( "fmt" "ragflow/internal/entity" "strings" + "time" "github.com/google/uuid" "gorm.io/gorm" @@ -59,21 +60,23 @@ var ( // DatasetService implements the RESTful dataset APIs from dataset_api.py. type DatasetService struct { - kbDAO *dao.KnowledgebaseDAO - documentDAO *dao.DocumentDAO - connectorDAO *dao.ConnectorDAO - tenantDAO *dao.TenantDAO - tenantLLMDAO *dao.TenantLLMDAO + kbDAO *dao.KnowledgebaseDAO + documentDAO *dao.DocumentDAO + connectorDAO *dao.ConnectorDAO + tenantDAO *dao.TenantDAO + tenantLLMDAO *dao.TenantLLMDAO + pipelineLogDAO *dao.PipelineOperationLogDAO } // NewDatasetService creates a new datasets service. func NewDatasetService() *DatasetService { return &DatasetService{ - kbDAO: dao.NewKnowledgebaseDAO(), - documentDAO: dao.NewDocumentDAO(), - connectorDAO: dao.NewConnectorDAO(), - tenantDAO: dao.NewTenantDAO(), - tenantLLMDAO: dao.NewTenantLLMDAO(), + kbDAO: dao.NewKnowledgebaseDAO(), + documentDAO: dao.NewDocumentDAO(), + connectorDAO: dao.NewConnectorDAO(), + tenantDAO: dao.NewTenantDAO(), + tenantLLMDAO: dao.NewTenantLLMDAO(), + pipelineLogDAO: dao.NewPipelineOperationLogDAO(), } } @@ -564,6 +567,186 @@ func (s *DatasetService) Accessible(kbID, userID string) bool { return s.kbDAO.Accessible(kbID, userID) } +// GetIngestionSummary returns dataset-level ingestion counters together with +// the aggregated document parsing status, mirroring +// dataset_api_service.get_ingestion_summary. +func (s *DatasetService) GetIngestionSummary(datasetID, userID string) (map[string]interface{}, common.ErrorCode, error) { + datasetID = strings.TrimSpace(datasetID) + if datasetID == "" { + return nil, common.CodeDataError, errors.New("Lack of \"Dataset ID\"") + } + + if !s.kbDAO.Accessible(datasetID, userID) { + return nil, common.CodeDataError, fmt.Errorf("User '%s' lacks permission for dataset '%s'", userID, datasetID) + } + + kb, err := s.kbDAO.GetByID(datasetID) + if err != nil || kb == nil { + return nil, common.CodeDataError, errors.New("Invalid Dataset ID") + } + + status, err := s.documentDAO.GetParsingStatusByKBID(datasetID) + if err != nil { + return nil, common.CodeServerError, errors.New("Database operation failed") + } + + return map[string]interface{}{ + "doc_num": kb.DocNum, + "chunk_num": kb.ChunkNum, + "token_num": kb.TokenNum, + "status": status, + }, common.CodeSuccess, nil +} + +// ListIngestionLogs lists ingestion logs for a dataset, mirroring +// dataset_api_service.list_ingestion_logs. log_type selects between +// dataset-level logs ("dataset") and per-file logs ("file"). +func (s *DatasetService) ListIngestionLogs(datasetID, userID string, page, pageSize int, orderby string, desc bool, operationStatus []string, createDateFrom, createDateTo, logType, keywords string) (map[string]interface{}, common.ErrorCode, error) { + datasetID = strings.TrimSpace(datasetID) + if datasetID == "" { + return nil, common.CodeDataError, errors.New("Lack of \"Dataset ID\"") + } + + if !s.kbDAO.Accessible(datasetID, userID) { + return nil, common.CodeDataError, errors.New("No authorization.") + } + + if logType != "dataset" && logType != "file" { + return nil, common.CodeDataError, errors.New("Invalid \"log_type\", expected \"dataset\" or \"file\"") + } + + var ( + logs []*entity.PipelineOperationLog + total int64 + err error + ) + if logType == "file" { + logs, total, err = s.pipelineLogDAO.GetFileLogsByKBID(datasetID, page, pageSize, orderby, desc, keywords, operationStatus, createDateFrom, createDateTo) + } else { + logs, total, err = s.pipelineLogDAO.GetDatasetLogsByKBID(datasetID, page, pageSize, orderby, desc, operationStatus, createDateFrom, createDateTo, keywords) + } + if err != nil { + return nil, common.CodeServerError, errors.New("Database operation failed") + } + + items := make([]map[string]interface{}, 0, len(logs)) + for _, log := range logs { + if log == nil { + continue + } + if logType == "file" { + items = append(items, fileIngestionLogToMap(log)) + } else { + items = append(items, datasetIngestionLogToMap(log)) + } + } + + return map[string]interface{}{ + "total": total, + "logs": items, + }, common.CodeSuccess, nil +} + +// GetIngestionLog returns a single dataset-level ingestion log, mirroring +// dataset_api_service.get_ingestion_log. +func (s *DatasetService) GetIngestionLog(datasetID, userID, logID string) (map[string]interface{}, common.ErrorCode, error) { + datasetID = strings.TrimSpace(datasetID) + if datasetID == "" { + return nil, common.CodeDataError, errors.New("Lack of \"Dataset ID\"") + } + + if !s.kbDAO.Accessible(datasetID, userID) { + return nil, common.CodeDataError, errors.New("No authorization.") + } + + log, err := s.pipelineLogDAO.GetByIDAndKBID(logID, datasetID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, common.CodeDataError, errors.New("Log not found") + } + return nil, common.CodeServerError, errors.New("Database operation failed") + } + + return datasetIngestionLogToMap(log), common.CodeSuccess, nil +} + +func datasetIngestionLogToMap(log *entity.PipelineOperationLog) map[string]interface{} { + return map[string]interface{}{ + "id": log.ID, + "tenant_id": log.TenantID, + "kb_id": log.KbID, + "progress": log.Progress, + "progress_msg": stringPointerValue(log.ProgressMsg), + "process_begin_at": timePointerValue(log.ProcessBeginAt), + "process_duration": log.ProcessDuration, + "task_type": log.TaskType, + "operation_status": log.OperationStatus, + "avatar": stringPointerValue(log.Avatar), + "status": stringPointerValue(log.Status), + "create_time": int64PointerValue(log.CreateTime), + "create_date": timePointerValue(log.CreateDate), + "update_time": int64PointerValue(log.UpdateTime), + "update_date": timePointerValue(log.UpdateDate), + } +} + +func fileIngestionLogToMap(log *entity.PipelineOperationLog) map[string]interface{} { + return map[string]interface{}{ + "id": log.ID, + "document_id": log.DocumentID, + "tenant_id": log.TenantID, + "kb_id": log.KbID, + "pipeline_id": stringPointerValue(log.PipelineID), + "pipeline_title": stringPointerValue(log.PipelineTitle), + "parser_id": log.ParserID, + "document_name": log.DocumentName, + "document_suffix": log.DocumentSuffix, + "document_type": log.DocumentType, + "source_from": log.SourceFrom, + "progress": log.Progress, + "progress_msg": stringPointerValue(log.ProgressMsg), + "process_begin_at": timePointerValue(log.ProcessBeginAt), + "process_duration": log.ProcessDuration, + "dsl": jsonMapValue(log.DSL), + "task_type": log.TaskType, + "operation_status": log.OperationStatus, + "avatar": stringPointerValue(log.Avatar), + "status": stringPointerValue(log.Status), + "create_time": int64PointerValue(log.CreateTime), + "create_date": timePointerValue(log.CreateDate), + "update_time": int64PointerValue(log.UpdateTime), + "update_date": timePointerValue(log.UpdateDate), + } +} + +func stringPointerValue(s *string) interface{} { + if s == nil { + return nil + } + return *s +} + +func int64PointerValue(i *int64) interface{} { + if i == nil { + return nil + } + return *i +} + +func timePointerValue(t *time.Time) interface{} { + if t == nil { + return nil + } + return t.Format("2006-01-02 15:04:05") +} + +func jsonMapValue(m entity.JSONMap) interface{} { + if m == nil { + return nil + } + return m +} + func (s *DatasetService) deleteDataset(tenantID string, kb *entity.Knowledgebase) error { return dao.DB.Transaction(func(tx *gorm.DB) error { var documents []entity.Document