mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
### What problem does this PR solve? ``` RAGFlow(admin)> mq publish 'msg2'; SUCCESS RAGFlow(admin)> mq publish 'msg3'; SUCCESS RAGFlow(admin)> mq list; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | | msg3 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull 2; +---------+---------------+ | message | subject | +---------+---------------+ | msg1 | tasks.RAGFLOW | | msg2 | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq pull noack; +---------+---------------+ | message | subject | +---------+---------------+ | abc | tasks.RAGFLOW | +---------+---------------+ RAGFlow(admin)> mq show +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | ack_pending_count | consumer_count | memory | message_count | pending_count | redelivered_count | waiting_count | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ | 2 | 1 | 0 | 2 | 0 | 1 | 0 | +-------------------+----------------+--------+---------------+---------------+-------------------+---------------+ RAGFlow(admin)> list ingestors; +--------------+-------------------------------------------+--------+ | host | name | status | +--------------+-------------------------------------------+--------+ | 192.168.1.38 | ingestor-8f0e4bd5650a4ac58b0151969fbf6935 | alive | +--------------+-------------------------------------------+--------+ RAGFlow(admin)> list ingestion tasks; +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | document_id | id | status | step | user | user_id | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ | ffe64fae423411f1a2d938a74640adcc | 90d3d0f6528941c1ac8eb0360effccc4 | COMPLETED | 5 | aaa@aaa.com | 2ba4881420fa11f19e9c38a74640adcc | +----------------------------------+----------------------------------+-----------+------+-------------+----------------------------------+ RAGFlow(admin)> remove ingestion tasks '90d3d0f6528941c1ac8eb0360effccc4'; +---------+----------------------------------+ | delete | task_id | +---------+----------------------------------+ | success | 90d3d0f6528941c1ac8eb0360effccc4 | +---------+----------------------------------+ RAGFlow(admin)> stop ingestion tasks 'e89e20d9a25848a1b79bd9345ddbfe1d'; +----------+----------------------------------+ | status | task_id | +----------+----------------------------------+ | STOPPING | e89e20d9a25848a1b79bd9345ddbfe1d | +----------+----------------------------------+ # Publish a message RAGFlow(admin)> mq publish 'cdd'; SUCCESS # List current tasks in the message queue RAGFlow(admin)> mq list +----------------------------------+---------------+ | message | subject | +----------------------------------+---------------+ | 7ce392a3c1624cd2be4b5276e8825059 | tasks.RAGFLOW | +----------------------------------+---------------+ # Consume a task from the message queue RAGFlow(admin)> mq pull +------+-----+----------------+ | ack | id | type | +------+-----+----------------+ | true | cdd | ingestion_test | +------+-----+----------------+ # User mode # List ingestion tasks, followed by dataset id RAGFlow(user)> list ingestion tasks from '0abe79f9423311f1ad8d38a74640adcc'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ RAGFlow(user)> list ingestion tasks; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-06-02T19:02:31+08:00 | 1780398151417 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | e89e20d9a25848a1b79bd9345ddbfe1d | | COMPLETED | 2026-06-02T19:02:52+08:00 | 1780398172208 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Create an ingestion task # First argument is document id, second argument is dataset id RAGFlow(user)> start ingestion 'ffe64fae423411f1a2d938a74640adcc' from '0abe79f9423311f1ad8d38a74640adcc'; +----------------------------------+-------------------------------------------+ | document_id | result | +----------------------------------+-------------------------------------------+ | ffe64fae423411f1a2d938a74640adcc | task_id: 8d758cd14a8b4ba8ab505003fb52017d | +----------------------------------+-------------------------------------------+ # Pause an ingestion task, first argument is ingestion id RAGFlow(user)> stop ingestion '8d758cd14a8b4ba8ab505003fb52017d'; +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | create_date | create_time | dataset_id | document_id | id | schema | status | update_date | update_time | user_id | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ | 2026-05-30T20:21:06+08:00 | 1780143666289 | 0abe79f9423311f1ad8d38a74640adcc | ffe64fae423411f1a2d938a74640adcc | 8d758cd14a8b4ba8ab505003fb52017d | | COMPLETED | 2026-05-30T20:21:26+08:00 | 1780143686431 | 2ba4881420fa11f19e9c38a74640adcc | +---------------------------+---------------+----------------------------------+----------------------------------+----------------------------------+--------+-----------+---------------------------+---------------+----------------------------------+ # Delete an ingestion task RAGFlow(api/default)> remove ingestion tasks 'f366450a27d54677aec1c7090add30f0'; +---------+----------------------------------+ | remove | task_id | +---------+----------------------------------+ | success | f366450a27d54677aec1c7090add30f0 | +---------+----------------------------------+ ``` ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai <haijin.chn@gmail.com>
478 lines
13 KiB
Go
478 lines
13 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
package dao
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"ragflow/internal/common"
|
|
"ragflow/internal/entity"
|
|
)
|
|
|
|
type IngestionTaskDAO struct{}
|
|
|
|
func NewIngestionTaskDAO() *IngestionTaskDAO {
|
|
return &IngestionTaskDAO{}
|
|
}
|
|
|
|
// Use by api server to create task
|
|
// created → running : After the ingestor component assigns the task, it changes the status to running
|
|
// running → completed : Task executes successfully
|
|
// running → failed : Error occurs during execution
|
|
// created → canceling : User cancels before the task is picked up by the ingestor
|
|
// running → canceling : User cancels during execution
|
|
// completed → canceling : User cancels a completed task (e.g., for cleanup/rollback)
|
|
// canceling → canceled : Cancellation completes
|
|
// failed → created : Retry (back to start)
|
|
// canceled → created : Retry/re-execute (back to start)
|
|
func (dao *IngestionTaskDAO) CheckAndCreate(ingestionTask *entity.IngestionTask) (*entity.IngestionTask, error) {
|
|
|
|
tx := DB.Begin()
|
|
if tx.Error != nil {
|
|
return nil, tx.Error
|
|
}
|
|
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
tx.Rollback()
|
|
panic(r)
|
|
}
|
|
}()
|
|
|
|
// Check if the task is created
|
|
var taskRecord *entity.IngestionTask
|
|
err := tx.Where("document_id = ?", ingestionTask.DocumentID).First(&taskRecord).Error
|
|
if err == nil {
|
|
// found
|
|
if taskRecord.Status == common.FAILED || taskRecord.Status == common.STOPPED {
|
|
// restart the task
|
|
err = tx.Model(&entity.IngestionTask{}).Where("id = ?", taskRecord.ID).Update("status", common.CREATED).Error
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return nil, err
|
|
}
|
|
} else {
|
|
return nil, fmt.Errorf("document id %s already exists, status: %s, task id: %s", ingestionTask.DocumentID, taskRecord.Status, taskRecord.ID)
|
|
}
|
|
} else {
|
|
// create ingestion task
|
|
ingestionTask.ID = common.GenerateUUID()
|
|
if err = tx.Create(ingestionTask).Error; err != nil {
|
|
tx.Rollback()
|
|
return nil, err
|
|
}
|
|
taskRecord = ingestionTask
|
|
}
|
|
|
|
if err = tx.Commit().Error; err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return taskRecord, nil
|
|
}
|
|
|
|
// UpdateStatus Update ingestion task status
|
|
func (dao *IngestionTaskDAO) UpdateStatus(taskID, status string) error {
|
|
return DB.Model(&entity.IngestionTask{}).Where("id = ?", taskID).Update("status", status).Error
|
|
}
|
|
|
|
// CheckAnd called by ingestor
|
|
// if task status is RUNNING, COMPLETED, STOPPED, FAILED, just return without error
|
|
// if task status is CREATE, update to RUNNING
|
|
// if task status is STOPPING, update to STOPPED
|
|
func (dao *IngestionTaskDAO) SetRunningByIngestor(taskID string) (*entity.IngestionTask, error) {
|
|
|
|
tx := DB.Begin()
|
|
if tx.Error != nil {
|
|
return nil, tx.Error
|
|
}
|
|
var committed bool
|
|
|
|
defer func() {
|
|
if committed {
|
|
tx.Commit()
|
|
} else {
|
|
tx.Rollback()
|
|
if r := recover(); r != nil {
|
|
panic(r)
|
|
}
|
|
}
|
|
}()
|
|
|
|
var tasks []*entity.IngestionTask
|
|
err := tx.Where("id = ?", taskID).Find(&tasks).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(tasks) == 0 {
|
|
return nil, common.ErrTaskNotFound
|
|
}
|
|
|
|
if len(tasks) != 1 {
|
|
return nil, fmt.Errorf("task %s has multiple records", taskID)
|
|
}
|
|
|
|
taskStatus := tasks[0].Status
|
|
switch taskStatus {
|
|
case common.CREATED:
|
|
tasks[0].Status = common.RUNNING
|
|
err = tx.Model(&entity.IngestionTask{}).Where("id = ?", taskID).Update("status", common.RUNNING).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
committed = true
|
|
return tasks[0], nil
|
|
case common.STOPPING:
|
|
tasks[0].Status = common.STOPPED
|
|
err = tx.Model(&entity.IngestionTask{}).Where("id = ?", taskID).Update("status", common.STOPPED).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
committed = true
|
|
return tasks[0], nil
|
|
case common.RUNNING:
|
|
// this task was executing before, just return without error
|
|
committed = true
|
|
return tasks[0], nil
|
|
default:
|
|
return tasks[0], nil
|
|
}
|
|
}
|
|
|
|
func (dao *IngestionTaskDAO) SetStoppingByAPIServer(taskID string) (*entity.IngestionTask, error) {
|
|
|
|
tx := DB.Begin()
|
|
if tx.Error != nil {
|
|
return nil, tx.Error
|
|
}
|
|
var committed bool
|
|
|
|
defer func() {
|
|
if committed {
|
|
tx.Commit()
|
|
} else {
|
|
tx.Rollback()
|
|
if r := recover(); r != nil {
|
|
panic(r)
|
|
}
|
|
}
|
|
}()
|
|
|
|
var tasks []*entity.IngestionTask
|
|
err := tx.Where("id = ?", taskID).Find(&tasks).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(tasks) == 0 {
|
|
return nil, fmt.Errorf("task %s not found", taskID)
|
|
}
|
|
|
|
if len(tasks) != 1 {
|
|
return nil, fmt.Errorf("task %s has multiple records", taskID)
|
|
}
|
|
|
|
taskStatus := tasks[0].Status
|
|
switch taskStatus {
|
|
case common.CREATED:
|
|
tasks[0].Status = common.STOPPED
|
|
err = tx.Model(&entity.IngestionTask{}).Where("id = ?", taskID).Update("status", common.STOPPED).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
committed = true
|
|
return tasks[0], nil
|
|
case common.RUNNING:
|
|
tasks[0].Status = common.STOPPING
|
|
err = tx.Model(&entity.IngestionTask{}).Where("id = ?", taskID).Update("status", common.STOPPING).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
committed = true
|
|
return tasks[0], nil
|
|
default:
|
|
return tasks[0], nil
|
|
}
|
|
}
|
|
|
|
type TaskletInfo struct {
|
|
TaskletID string `json:"tasklet_id"`
|
|
FilesToDelete []string `json:"files_to_delete"`
|
|
}
|
|
|
|
type TaskInfo struct {
|
|
TaskID string `json:"task_id"`
|
|
FilesToDelete []string `json:"files_to_delete"`
|
|
Tasklets []TaskletInfo `json:"tasklets"`
|
|
}
|
|
|
|
func (dao *IngestionTaskDAO) RemoveByAPIServerOrAdminServer(taskID string, userID *string) (*TaskInfo, error) {
|
|
|
|
tx := DB.Begin()
|
|
if tx.Error != nil {
|
|
return nil, tx.Error
|
|
}
|
|
var committed bool
|
|
|
|
defer func() {
|
|
if committed {
|
|
tx.Commit()
|
|
} else {
|
|
tx.Rollback()
|
|
if r := recover(); r != nil {
|
|
panic(r)
|
|
}
|
|
}
|
|
}()
|
|
|
|
var tasks []*entity.IngestionTask
|
|
err := tx.Where("id = ?", taskID).Find(&tasks).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(tasks) == 0 {
|
|
return nil, fmt.Errorf("task %s not found", taskID)
|
|
}
|
|
|
|
if len(tasks) != 1 {
|
|
return nil, fmt.Errorf("task %s has multiple records", taskID)
|
|
}
|
|
|
|
if userID != nil {
|
|
if tasks[0].UserID != *userID {
|
|
return nil, errors.New("task does not belong to the user")
|
|
}
|
|
}
|
|
|
|
taskStatus := tasks[0].Status
|
|
switch taskStatus {
|
|
case common.CREATED, common.STOPPED, common.COMPLETED, common.FAILED:
|
|
// get all ingestion tasklets
|
|
var tasklets []*entity.IngestionTasklet
|
|
err = tx.Where("task_id = ?", taskID).Find(&tasklets).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var TaskletInfos []TaskletInfo
|
|
for _, tasklet := range tasklets {
|
|
// get all ingestion tasklet log
|
|
var taskletLogs []*entity.IngestionTaskletLog
|
|
err = tx.Where("tasklet_id = ?", tasklet.ID).Find(&taskletLogs).Error
|
|
|
|
fileMap := make(map[string]bool)
|
|
for _, taskletLog := range taskletLogs {
|
|
files, ok := taskletLog.Checkpoint["files"].([]string)
|
|
if ok {
|
|
for _, file := range files {
|
|
fileMap[file] = true
|
|
}
|
|
}
|
|
}
|
|
var filesToDelete []string
|
|
for file := range fileMap {
|
|
filesToDelete = append(filesToDelete, file)
|
|
}
|
|
TaskletInfos = append(TaskletInfos, TaskletInfo{
|
|
TaskletID: tasklet.ID,
|
|
FilesToDelete: filesToDelete,
|
|
})
|
|
}
|
|
|
|
// get all ingestion task log
|
|
var taskLogs []*entity.IngestionTaskLog
|
|
err = tx.Where("task_id = ?", taskID).Find(&taskLogs).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fileMap := make(map[string]bool)
|
|
for _, taskLog := range taskLogs {
|
|
files, ok := taskLog.Checkpoint["files"].([]string)
|
|
if ok {
|
|
for _, file := range files {
|
|
fileMap[file] = true
|
|
}
|
|
}
|
|
}
|
|
var filesToDelete []string
|
|
for file := range fileMap {
|
|
filesToDelete = append(filesToDelete, file)
|
|
}
|
|
|
|
err = tx.Model(&entity.IngestionTask{}).Where("id = ?", taskID).Delete(&entity.IngestionTask{}).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
taskInfo := &TaskInfo{
|
|
TaskID: taskID,
|
|
FilesToDelete: filesToDelete,
|
|
Tasklets: TaskletInfos,
|
|
}
|
|
committed = true
|
|
return taskInfo, nil
|
|
default:
|
|
return nil, fmt.Errorf("task %s is executing, cannot be removed", taskID)
|
|
}
|
|
}
|
|
|
|
func (dao *IngestionTaskDAO) GetAllTasks(page, pageSize int) ([]*entity.IngestionTask, error) {
|
|
var tasks []*entity.IngestionTask
|
|
var err error
|
|
if pageSize == 0 {
|
|
err = DB.Find(&tasks).Error
|
|
} else {
|
|
err = DB.Order("create_time DESC").Offset((page - 1) * pageSize).Limit(pageSize).Find(&tasks).Error
|
|
}
|
|
return tasks, err
|
|
}
|
|
|
|
func (dao *IngestionTaskDAO) ListByUserID(userID string, page, pageSize int) ([]*entity.IngestionTask, error) {
|
|
var tasks []*entity.IngestionTask
|
|
var err error
|
|
if pageSize == 0 {
|
|
err = DB.Where("user_id = ?", userID).Order("create_time DESC").Find(&tasks).Error
|
|
} else {
|
|
err = DB.Where("user_id = ?", userID).Order("create_time DESC").Offset((page - 1) * pageSize).Limit(pageSize).Find(&tasks).Error
|
|
}
|
|
|
|
return tasks, err
|
|
}
|
|
|
|
func (dao *IngestionTaskDAO) ListByUserIDAndDatasetID(userID, datasetID string, page, pageSize int) ([]*entity.IngestionTask, error) {
|
|
var tasks []*entity.IngestionTask
|
|
var err error
|
|
if pageSize == 0 {
|
|
err = DB.Where("user_id = ? AND dataset_id = ?", userID, datasetID).Order("create_time DESC").Find(&tasks).Error
|
|
} else {
|
|
err = DB.Where("user_id = ? AND dataset_id = ?", userID, datasetID).Order("create_time DESC").Offset((page - 1) * pageSize).Limit(pageSize).Find(&tasks).Error
|
|
}
|
|
|
|
return tasks, err
|
|
}
|
|
|
|
func (dao *IngestionTaskDAO) GetByID(id string) (*entity.IngestionTask, error) {
|
|
var task *entity.IngestionTask
|
|
err := DB.Where("id = ?", id).First(&task).Error
|
|
return task, err
|
|
}
|
|
|
|
func (dao *IngestionTaskDAO) GetByDocumentID(documentId string) (*entity.IngestionTask, error) {
|
|
var task *entity.IngestionTask
|
|
err := DB.Where("document_id = ?", documentId).First(&task).Error
|
|
return task, err
|
|
}
|
|
|
|
type IngestionTaskLogDAO struct{}
|
|
|
|
func NewIngestionTaskLogDAO() *IngestionTaskLogDAO {
|
|
return &IngestionTaskLogDAO{}
|
|
}
|
|
|
|
func (dao *IngestionTaskLogDAO) Create(ingestionLog *entity.IngestionTaskLog) error {
|
|
return DB.Create(ingestionLog).Error
|
|
}
|
|
|
|
func (dao *IngestionTaskLogDAO) ListLogsByTaskID(taskID string) ([]*entity.IngestionTaskLog, error) {
|
|
var tasks []*entity.IngestionTaskLog
|
|
err := DB.Where("task_id = ?", taskID).Order("create_time DESC").Find(&tasks).Error
|
|
return tasks, err
|
|
}
|
|
|
|
func (dao *IngestionTaskLogDAO) LatestLogByTaskID(taskID string) (*entity.IngestionTaskLog, error) {
|
|
var task *entity.IngestionTaskLog
|
|
err := DB.Where("task_id = ?", taskID).Order("create_time DESC").First(&task).Error
|
|
return task, err
|
|
}
|
|
|
|
func (dao *IngestionTaskLogDAO) GetLogByLogID(logID string) (*entity.IngestionTaskLog, error) {
|
|
var task *entity.IngestionTaskLog
|
|
err := DB.Where("id = ?", logID).First(&task).Error
|
|
return task, err
|
|
}
|
|
|
|
func (dao *IngestionTaskLogDAO) DeleteByTaskID(taskID string) (int64, error) {
|
|
result := DB.Unscoped().Where("task_id = ?", taskID).Delete(&entity.IngestionTaskLog{})
|
|
return result.RowsAffected, result.Error
|
|
}
|
|
|
|
type IngestionTaskletDAO struct{}
|
|
|
|
func NewIngestionTaskletDAO() *IngestionTaskletDAO {
|
|
return &IngestionTaskletDAO{}
|
|
}
|
|
|
|
func (dao *IngestionTaskletDAO) Create(ingestionTasklet *entity.IngestionTasklet) error {
|
|
return DB.Create(ingestionTasklet).Error
|
|
}
|
|
|
|
func (dao *IngestionTaskletDAO) UpdateStatus(taskletID, status string) error {
|
|
return DB.Model(&entity.IngestionTasklet{}).Where("id = ?", taskletID).Update("status", status).Error
|
|
}
|
|
func (dao *IngestionTaskletDAO) GetAllTasklets() ([]*entity.IngestionTasklet, error) {
|
|
var tasks []*entity.IngestionTasklet
|
|
err := DB.Find(&tasks).Error
|
|
return tasks, err
|
|
}
|
|
|
|
func (dao *IngestionTaskletDAO) ListByUserID(userID string) ([]*entity.IngestionTasklet, error) {
|
|
var tasks []*entity.IngestionTasklet
|
|
err := DB.Where("user_id = ?", userID).Find(&tasks).Error
|
|
return tasks, err
|
|
}
|
|
|
|
func (dao *IngestionTaskletDAO) GetByID(id string) (*entity.IngestionTasklet, error) {
|
|
var task *entity.IngestionTasklet
|
|
err := DB.Where("id = ?", id).First(&task).Error
|
|
return task, err
|
|
}
|
|
|
|
type IngestionTaskletLogDAO struct{}
|
|
|
|
func NewIngestionTaskletLogDAO() *IngestionTaskletLogDAO {
|
|
return &IngestionTaskletLogDAO{}
|
|
}
|
|
|
|
func (dao *IngestionTaskletLogDAO) Create(ingestionLog *entity.IngestionTaskletLog) error {
|
|
return DB.Create(ingestionLog).Error
|
|
}
|
|
|
|
func (dao *IngestionTaskletLogDAO) ListLogsByTaskletID(taskID string) ([]*entity.IngestionTaskletLog, error) {
|
|
var tasks []*entity.IngestionTaskletLog
|
|
err := DB.Where("task_id = ?", taskID).Find(&tasks).Error
|
|
return tasks, err
|
|
}
|
|
|
|
func (dao *IngestionTaskletLogDAO) GetLogByLogID(logID string) (*entity.IngestionTaskletLog, error) {
|
|
var task *entity.IngestionTaskletLog
|
|
err := DB.Where("id = ?", logID).First(&task).Error
|
|
return task, err
|
|
}
|
|
|
|
func (dao *IngestionTaskletLogDAO) LatestLogByTaskletID(taskletID string) (*entity.IngestionTaskletLog, error) {
|
|
var tasklet *entity.IngestionTaskletLog
|
|
err := DB.Where("tasklet_id = ?", taskletID).Order("create_time DESC").First(&tasklet).Error
|
|
return tasklet, err
|
|
}
|
|
|
|
func (dao *IngestionTaskletLogDAO) DeleteByTaskletID(taskID string) (int64, error) {
|
|
result := DB.Unscoped().Where("task_id = ?", taskID).Delete(&entity.IngestionTaskletLog{})
|
|
return result.RowsAffected, result.Error
|
|
}
|