Files
ragflow/internal/service/document.go
Hz_ f59332bc37 feat(go-api): implement Go-side document PATCH API & align parsing/metadata sync behavior (#15975)
### What problem does this PR solve?

This PR implements the Go backend counterpart for the document partial
update API:
`PATCH /api/v1/datasets/:dataset_id/documents/:document_id`

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Refactoring
2026-06-18 11:08:47 +08:00

2090 lines
61 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"time"
"ragflow/internal/common"
"ragflow/internal/dao"
"ragflow/internal/engine"
"ragflow/internal/engine/redis"
enginetypes "ragflow/internal/engine/types"
"ragflow/internal/entity"
"ragflow/internal/server"
"ragflow/internal/storage"
"ragflow/internal/tokenizer"
"ragflow/internal/utility"
"gorm.io/gorm"
)
// DocumentService document service
type DocumentService struct {
documentDAO *dao.DocumentDAO
kbDAO *dao.KnowledgebaseDAO
ingestionTaskDAO *dao.IngestionTaskDAO
ingestionTaskLogDAO *dao.IngestionTaskLogDAO
docEngine engine.DocEngine
engineType server.EngineType
metadataSvc *MetadataService
taskDAO *dao.TaskDAO
file2DocumentDAO *dao.File2DocumentDAO
fileDAO *dao.FileDAO
}
// NewDocumentService create document service
func NewDocumentService() *DocumentService {
cfg := server.GetConfig()
return &DocumentService{
documentDAO: dao.NewDocumentDAO(),
ingestionTaskDAO: dao.NewIngestionTaskDAO(),
ingestionTaskLogDAO: dao.NewIngestionTaskLogDAO(),
kbDAO: dao.NewKnowledgebaseDAO(),
docEngine: engine.Get(),
engineType: cfg.DocEngine.Type,
metadataSvc: NewMetadataService(),
taskDAO: dao.NewTaskDAO(),
file2DocumentDAO: dao.NewFile2DocumentDAO(),
fileDAO: dao.NewFileDAO(),
}
}
// CreateDocumentRequest create document request
type CreateDocumentRequest struct {
Name string `json:"name" binding:"required"`
KbID string `json:"kb_id" binding:"required"`
ParserID string `json:"parser_id" binding:"required"`
CreatedBy string `json:"created_by" binding:"required"`
Type string `json:"type"`
Source string `json:"source"`
}
// UpdateDocumentRequest update document request
type UpdateDocumentRequest struct {
Name *string `json:"name"`
Run *string `json:"run"`
TokenNum *int64 `json:"token_num"`
ChunkNum *int64 `json:"chunk_num"`
Progress *float64 `json:"progress"`
ProgressMsg *string `json:"progress_msg"`
}
// DocumentResponse document response
type DocumentResponse struct {
ID string `json:"id"`
Name *string `json:"name,omitempty"`
KbID string `json:"kb_id"`
ParserID string `json:"parser_id"`
PipelineID *string `json:"pipeline_id,omitempty"`
Type string `json:"type"`
SourceType string `json:"source_type"`
CreatedBy string `json:"created_by"`
Location *string `json:"location,omitempty"`
Size int64 `json:"size"`
TokenNum int64 `json:"token_num"`
ChunkNum int64 `json:"chunk_num"`
Progress float64 `json:"progress"`
ProgressMsg *string `json:"progress_msg,omitempty"`
ProcessDuration float64 `json:"process_duration"`
Suffix string `json:"suffix"`
Run *string `json:"run,omitempty"`
Status *string `json:"status,omitempty"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
type ThumbnailResponse struct {
ID string `json:"id"`
Thumbnail *string `json:"thumbnail,omitempty"`
KbID string `json:"kb_id"`
}
type ArtifactResponse struct {
Data []byte
ContentType string
SafeFilename string
ForceAttachment bool
}
type UpdateDatasetDocumentRequest struct {
Name *string `json:"name"`
ChunkMethod *string `json:"chunk_method"`
ParserID *string `json:"parser_id"`
ChunkCount *int64 `json:"chunk_count"`
TokenCount *int64 `json:"token_count"`
PipelineID *string `json:"pipeline_id"`
Enabled *int `json:"enabled"`
Progress *float64 `json:"progress"`
ParserConfig map[string]any `json:"parser_config"`
MetaFields map[string]any `json:"meta_fields"`
}
// PATCH /api/v1/datasets/:dataset_id/documents/:document_id.
type UpdateDatasetDocumentResponse struct {
ID string `json:"id"`
Thumbnail *string `json:"thumbnail,omitempty"`
DatasetID string `json:"dataset_id"`
ChunkMethod string `json:"chunk_method"`
PipelineID *string `json:"pipeline_id,omitempty"`
ParserConfig map[string]interface{} `json:"parser_config"`
SourceType string `json:"source_type"`
Type string `json:"type"`
CreatedBy string `json:"created_by"`
Name *string `json:"name,omitempty"`
Location *string `json:"location,omitempty"`
Size int64 `json:"size"`
TokenCount int64 `json:"token_count"`
ChunkCount int64 `json:"chunk_count"`
Progress float64 `json:"progress"`
ProgressMsg *string `json:"progress_msg,omitempty"`
ProcessBeginAt *time.Time `json:"process_begin_at,omitempty"`
ProcessDuration float64 `json:"process_duration"`
ContentHash *string `json:"content_hash,omitempty"`
MetaFields map[string]interface{} `json:"meta_fields,omitempty"`
Suffix string `json:"suffix"`
Run string `json:"run"`
Status *string `json:"status,omitempty"`
CreateTime *int64 `json:"create_time,omitempty"`
CreateDate *time.Time `json:"create_date,omitempty"`
UpdateTime *int64 `json:"update_time,omitempty"`
UpdateDate *time.Time `json:"update_date,omitempty"`
}
var (
ErrArtifactInvalidFilename = errors.New("Invalid filename.")
ErrArtifactInvalidFileType = errors.New("Invalid file type.")
ErrArtifactNotFound = errors.New("Artifact not found.")
)
var artifactContentTypes = map[string]string{
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".svg": "image/svg+xml",
".pdf": "application/pdf",
".csv": "text/csv",
".json": "application/json",
".html": "text/html",
}
var artifactForceAttachmentExtensions = map[string]struct{}{
".htm": {},
".html": {},
".shtml": {},
".xht": {},
".xhtml": {},
".xml": {},
".mhtml": {},
".svg": {},
}
var artifactForceAttachmentContentTypes = map[string]struct{}{
"text/html": {},
"image/svg+xml": {},
"application/xhtml+xml": {},
"text/xml": {},
"application/xml": {},
"multipart/related": {},
}
var artifactUnsafeFilenameChars = regexp.MustCompile(`[^\pL\pN_.-]`)
// GetDocumentImage retrieves an image object from storage.
func (s *DocumentService) GetDocumentImage(imageID string) ([]byte, error) {
parts := strings.Split(imageID, "-")
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return nil, fmt.Errorf("Image not found.")
}
storageImpl := storage.GetStorageFactory().GetStorage()
if storageImpl == nil {
return nil, fmt.Errorf("storage not initialized")
}
return storageImpl.Get(parts[0], parts[1])
}
// GetDocumentArtifact retrieves a sandbox artifact from object storage.
func (s *DocumentService) GetDocumentArtifact(filename string) (*ArtifactResponse, error) {
basename := filepath.Base(filename)
if basename != filename || strings.Contains(filename, "/") || strings.Contains(filename, "\\") {
return nil, ErrArtifactInvalidFilename
}
ext := strings.ToLower(filepath.Ext(basename))
contentType, ok := artifactContentTypes[ext]
if !ok {
return nil, ErrArtifactInvalidFileType
}
storageImpl := storage.GetStorageFactory().GetStorage()
if storageImpl == nil {
return nil, fmt.Errorf("storage not initialized")
}
bucket := sandboxArtifactBucket()
if !storageImpl.ObjExist(bucket, basename) {
return nil, ErrArtifactNotFound
}
data, err := storageImpl.Get(bucket, basename)
if err != nil {
return nil, err
}
if len(data) == 0 {
return nil, ErrArtifactNotFound
}
return &ArtifactResponse{
Data: data,
ContentType: contentType,
SafeFilename: sanitizeArtifactFilename(basename),
ForceAttachment: shouldForceArtifactAttachment(ext, contentType),
}, nil
}
func sandboxArtifactBucket() string {
if bucket := os.Getenv("SANDBOX_ARTIFACT_BUCKET"); bucket != "" {
return bucket
}
return "sandbox-artifacts"
}
func sanitizeArtifactFilename(filename string) string {
return artifactUnsafeFilenameChars.ReplaceAllString(filename, "_")
}
func shouldForceArtifactAttachment(ext, contentType string) bool {
if _, ok := artifactForceAttachmentExtensions[strings.ToLower(ext)]; ok {
return true
}
_, ok := artifactForceAttachmentContentTypes[strings.ToLower(contentType)]
return ok
}
type DocumentPreview struct {
Data []byte
ContentType string
FileName string
}
func (s *DocumentService) GetDocumentPreview(docID string) (*DocumentPreview, error) {
doc, err := s.documentDAO.GetByID(docID)
if err != nil {
return nil, err
}
bucket, name, err := s.GetDocumentStorageAddress(doc)
if err != nil {
return nil, err
}
storageImpl := storage.GetStorageFactory().GetStorage()
if storageImpl == nil {
return nil, fmt.Errorf("storage not initialized")
}
data, err := storageImpl.Get(bucket, name)
if err != nil {
return nil, err
}
if len(data) == 0 {
return nil, ErrArtifactNotFound
}
fileName := ""
if doc.Name != nil {
fileName = *doc.Name
}
ext := utility.GetFileExtension(fileName)
contentType := utility.GetContentType(ext, doc.Type)
return &DocumentPreview{
Data: data,
ContentType: contentType,
FileName: fileName,
}, nil
}
func (s *DocumentService) GetDocumentStorageAddress(doc *entity.Document) (string, string, error) {
if doc == nil {
return "", "", fmt.Errorf("document is nil")
}
file2DocumentDAO := dao.NewFile2DocumentDAO()
fileDAO := dao.NewFileDAO()
mappings, err := file2DocumentDAO.GetByDocumentID(doc.ID)
if err != nil {
return "", "", err
}
if len(mappings) > 0 && mappings[0].FileID != nil {
file, err := fileDAO.GetByID(*mappings[0].FileID)
if err != nil {
return "", "", err
}
if file.SourceType == "" || entity.FileSource(file.SourceType) == entity.FileSourceLocal {
if file.Location == nil || *file.Location == "" {
return "", "", fmt.Errorf("file location is empty")
}
return file.ParentID, *file.Location, nil
}
}
if doc.Location == nil || *doc.Location == "" {
return "", "", fmt.Errorf("document location is empty")
}
return doc.KbID, *doc.Location, nil
}
type DownloadDocumentResp struct {
Data []byte
FileName string
ContentType string
}
func (s *DocumentService) DownloadDocument(datasetID, docID string) (*DownloadDocumentResp, error) {
if docID == "" {
return nil, fmt.Errorf("Specify document_id please.")
}
doc, err := s.documentDAO.GetByID(docID)
if err != nil || doc.KbID != datasetID {
return nil, fmt.Errorf("The dataset not own the document %s.", docID)
}
bucket, name, err := s.GetDocumentStorageAddress(doc)
if err != nil {
return nil, err
}
storageImpl := storage.GetStorageFactory().GetStorage()
if storageImpl == nil {
return nil, fmt.Errorf("storage not initialized")
}
data, err := storageImpl.Get(bucket, name)
if err != nil {
return nil, err
}
if len(data) == 0 {
return nil, fmt.Errorf("This file is empty.")
}
fileName := ""
if doc.Name != nil {
fileName = *doc.Name
}
return &DownloadDocumentResp{
Data: data,
FileName: fileName,
ContentType: "application/octet-stream",
}, nil
}
// CreateDocument create document
func (s *DocumentService) CreateDocument(req *CreateDocumentRequest) (*entity.Document, error) {
document := &entity.Document{
Name: &req.Name,
KbID: req.KbID,
ParserID: req.ParserID,
CreatedBy: req.CreatedBy,
Type: req.Type,
SourceType: req.Source,
Suffix: ".doc",
Status: func() *string { s := "0"; return &s }(),
}
if err := s.documentDAO.Create(document); err != nil {
return nil, fmt.Errorf("failed to create document: %w", err)
}
return document, nil
}
// GetDocumentByID get document by ID
func (s *DocumentService) GetDocumentByID(id string) (*DocumentResponse, error) {
document, err := s.documentDAO.GetByID(id)
if err != nil {
return nil, err
}
return s.toResponse(document), nil
}
// UpdateDocument update document
func (s *DocumentService) UpdateDocument(id string, req *UpdateDocumentRequest) error {
document, err := s.documentDAO.GetByID(id)
if err != nil {
return err
}
if req.Name != nil {
document.Name = req.Name
}
if req.Run != nil {
document.Run = req.Run
}
if req.TokenNum != nil {
document.TokenNum = *req.TokenNum
}
if req.ChunkNum != nil {
document.ChunkNum = *req.ChunkNum
}
if req.Progress != nil {
document.Progress = *req.Progress
}
if req.ProgressMsg != nil {
document.ProgressMsg = req.ProgressMsg
}
return s.documentDAO.Update(document)
}
// DeleteDocument delete document — delegates to full cleanup logic.
func (s *DocumentService) DeleteDocument(id string) error {
return s.deleteDocumentFull(id)
}
// DeleteDocuments deletes multiple documents under a dataset.
//
// ids: specific document IDs; deleteAll: delete all docs in the dataset.
// Returns the number of successfully deleted documents.
func (s *DocumentService) DeleteDocuments(ids []string, deleteAll bool, datasetID, userID string) (int, error) {
// 1. Check dataset is accessible by the user
if !s.kbDAO.Accessible(datasetID, userID) {
return 0, fmt.Errorf("You don't own the dataset %s.", datasetID)
}
// 2. Resolve document IDs
if deleteAll {
if err := dao.DB.Model(&entity.Document{}).
Where("kb_id = ?", datasetID).
Pluck("id", &ids).Error; err != nil {
return 0, fmt.Errorf("failed to query documents: %w", err)
}
}
if len(ids) == 0 {
return 0, nil
}
// 3. Deduplicate (before validation so dup count doesn't matter)
ids = common.Deduplicate(ids)
// 4. Validate IDs belong to this dataset (only for explicit ids; deleteAll is already scoped)
if !deleteAll {
if _, err := s.validateDocsInDataset(ids, datasetID); err != nil {
return 0, err
}
}
// 5. Delete each document (non-critical failures are tolerated per doc)
deleted := 0
for _, docID := range ids {
if err := s.deleteDocumentFull(docID); err != nil {
common.Logger.Warn(fmt.Sprintf("DeleteDocuments: failed to delete %s: %v", docID, err))
continue
}
deleted++
}
return deleted, nil
}
// deleteDocumentFull performs full document cleanup. Non-critical failures
// are tolerated (logged and continue). Critical failures (e.g. document or
// KB not found) return an error immediately.
func (s *DocumentService) deleteDocumentFull(docID string) error {
doc, kb, err := s.resolveDocAndKB(docID)
if err != nil {
return err
}
// Delete tasks from DB
if _, delErr := s.taskDAO.DeleteByDocIDs([]string{docID}); delErr != nil {
common.Logger.Warn(fmt.Sprintf("failed to delete tasks for %s: %v", docID, delErr))
}
s.deleteDocEngineData(docID, kb.TenantID, doc.KbID)
if err := s.deleteDocRecordWithCounters(doc, kb.ID); err != nil {
return err
}
s.cleanupFileReferences(docID)
return nil
}
// RemoveDocumentKeepFile removes a document's chunks/metadata and the document
// row, decrementing the KB counters (doc_num/chunk_num/token_num), WITHOUT
// deleting the underlying file record, its storage blob, or its file2document
// mappings. Mirrors Python DocumentService.remove_document — the caller is
// responsible for cleaning up the file2document mappings separately.
func (s *DocumentService) RemoveDocumentKeepFile(docID string) error {
doc, kb, err := s.resolveDocAndKB(docID)
if err != nil {
return err
}
if _, delErr := s.taskDAO.DeleteByDocIDs([]string{docID}); delErr != nil {
common.Logger.Warn(fmt.Sprintf("RemoveDocumentKeepFile: failed to delete tasks for %s: %v", docID, delErr))
}
s.deleteDocEngineData(docID, kb.TenantID, doc.KbID)
return s.deleteDocRecordWithCounters(doc, kb.ID)
}
// InsertDocument creates a document row and increments the owning KB's doc_num
// counter in a single transaction. Mirrors Python DocumentService.insert, which
// updates dataset/document counters on insert. The document's ID and timestamps
// are populated by the caller / model hooks before insertion.
func (s *DocumentService) InsertDocument(doc *entity.Document) error {
return dao.DB.Transaction(func(tx *gorm.DB) error {
if err := tx.Create(doc).Error; err != nil {
return fmt.Errorf("failed to create document: %w", err)
}
// Guard the counter bump with RowsAffected: documents.kb_id has no DB-level
// FK, so Create can succeed against a non-existent KB and the Update would
// then report a nil error with 0 rows touched, silently desyncing doc_num.
// Roll the whole transaction back in that case (mirrors the counter checks
// in deleteDocRecordWithCounters).
result := tx.Model(&entity.Knowledgebase{}).
Where("id = ?", doc.KbID).
Update("doc_num", gorm.Expr("doc_num + 1"))
if result.Error != nil {
return fmt.Errorf("failed to increment doc_num for KB %s: %w", doc.KbID, result.Error)
}
if result.RowsAffected == 0 {
return fmt.Errorf("knowledgebase %s not found", doc.KbID)
}
return nil
})
}
// resolveDocAndKB loads the document and its knowledgebase, returning both or
// an error.
func (s *DocumentService) resolveDocAndKB(docID string) (*entity.Document, *entity.Knowledgebase, error) {
doc, err := s.documentDAO.GetByID(docID)
if err != nil {
return nil, nil, fmt.Errorf("document not found: %w", err)
}
kb, err := s.kbDAO.GetByID(doc.KbID)
if err != nil {
return nil, nil, fmt.Errorf("knowledgebase not found: %w", err)
}
return doc, kb, nil
}
// deleteDocEngineData removes chunks and metadata from the document engine.
// No-op when the engine is nil.
func (s *DocumentService) deleteDocEngineData(docID, tenantID, kbID string) {
if s.docEngine == nil {
return
}
ctx := context.Background()
indexName := fmt.Sprintf("ragflow_%s", tenantID)
if _, delErr := s.docEngine.DeleteChunks(ctx, map[string]interface{}{"doc_id": docID}, indexName, kbID); delErr != nil {
common.Logger.Warn(fmt.Sprintf("deleteDocEngineData: failed to delete chunks for %s: %v", docID, delErr))
}
if s.metadataSvc != nil {
_ = s.DeleteDocumentAllMetadata(docID) // logs internally
}
}
// deleteDocRecordWithCounters hard-deletes the document row and decrements the
// KB counters in a single transaction. Counters are only decremented when a
// document row was actually removed (RowsAffected > 0), guarding against
// double-decrement on retries or concurrent deletes.
func (s *DocumentService) deleteDocRecordWithCounters(doc *entity.Document, kbID string) error {
return dao.DB.Transaction(func(tx *gorm.DB) error {
result := tx.Where("id = ?", doc.ID).Delete(&entity.Document{})
if result.Error != nil {
return fmt.Errorf("failed to delete document %s: %w", doc.ID, result.Error)
}
if result.RowsAffected == 0 {
return nil // already deleted by a concurrent request — skip counters
}
decErr := tx.Model(&entity.Knowledgebase{}).
Where("id = ?", kbID).
Updates(map[string]interface{}{
"doc_num": gorm.Expr("doc_num - 1"),
"chunk_num": gorm.Expr("chunk_num - ?", doc.ChunkNum),
"token_num": gorm.Expr("token_num - ?", doc.TokenNum),
}).Error
if decErr != nil {
common.Logger.Warn(fmt.Sprintf("deleteDocRecordWithCounters: failed to decrement KB %s: %v", kbID, decErr))
}
return nil
})
}
// cleanupFileReferences deletes file2document mappings for docID, and for each
// referenced file, only hard-deletes the file record and its storage blob when
// no other document still references the same file_id.
func (s *DocumentService) cleanupFileReferences(docID string) {
mappings, mapErr := s.file2DocumentDAO.GetByDocumentID(docID)
if mapErr != nil {
common.Logger.Warn(fmt.Sprintf("cleanupFileReferences: failed to get f2d mappings for %s: %v", docID, mapErr))
}
if len(mappings) == 0 {
return
}
// Collect unique file_ids
seen := make(map[string]bool)
var fileIDs []string
for _, m := range mappings {
if m.FileID == nil || seen[*m.FileID] {
continue
}
seen[*m.FileID] = true
fileIDs = append(fileIDs, *m.FileID)
}
// Delete all file2document rows for this document
if delErr := s.file2DocumentDAO.DeleteByDocumentID(docID); delErr != nil {
common.Logger.Warn(fmt.Sprintf("cleanupFileReferences: failed to delete f2d for %s: %v", docID, delErr))
}
// For each file, only delete the record and blob when no other doc references it
for _, fileID := range fileIDs {
remaining, remErr := s.file2DocumentDAO.GetByFileID(fileID)
if remErr != nil {
common.Logger.Warn(fmt.Sprintf("cleanupFileReferences: failed to check remaining f2d for %s: %v", fileID, remErr))
continue
}
if len(remaining) > 0 {
continue
}
fileDAO := dao.NewFileDAO()
file, fErr := fileDAO.GetByID(fileID)
if fErr != nil || file == nil {
common.Logger.Warn(fmt.Sprintf("cleanupFileReferences: file not found %s: %v", fileID, fErr))
continue
}
if _, delErr := fileDAO.DeleteByIDs([]string{fileID}); delErr != nil {
common.Logger.Warn(fmt.Sprintf("cleanupFileReferences: failed to delete file %s: %v", fileID, delErr))
continue // keep the blob so the live file row still has its object
}
if file.Location != nil && *file.Location != "" {
storageImpl := storage.GetStorageFactory().GetStorage()
if storageImpl != nil {
if rmErr := storageImpl.Remove(file.ParentID, *file.Location); rmErr != nil {
common.Logger.Warn(fmt.Sprintf("cleanupFileReferences: failed to remove blob %s/%s: %v", file.ParentID, *file.Location, rmErr))
}
}
}
}
}
// ListDocuments list documents
func (s *DocumentService) ListDocuments(page, pageSize int) ([]*DocumentResponse, int64, error) {
offset := (page - 1) * pageSize
documents, total, err := s.documentDAO.List(offset, pageSize)
if err != nil {
return nil, 0, err
}
responses := make([]*DocumentResponse, len(documents))
for i, doc := range documents {
responses[i] = s.toResponse(doc)
}
return responses, total, nil
}
func (s *DocumentService) GetThumbnail(docID string) (*ThumbnailResponse, error) {
document, err := s.documentDAO.GetByID(docID)
if err != nil {
return nil, err
}
var result ThumbnailResponse
result.ID = document.ID
result.Thumbnail = document.Thumbnail
result.KbID = document.KbID
return &result, nil
}
// ListDocumentsByDatasetID list documents by knowledge base ID
func (s *DocumentService) ListDocumentsByDatasetID(kbID string, page, pageSize int) ([]*entity.DocumentListItem, int64, error) {
offset := (page - 1) * pageSize
documents, total, err := s.documentDAO.ListByKBID(kbID, offset, pageSize)
if err != nil {
return nil, 0, err
}
responses := make([]*entity.DocumentListItem, len(documents))
for i, doc := range documents {
responses[i] = doc
}
return responses, total, nil
}
// GetDocumentsByAuthorID get documents by author ID
func (s *DocumentService) GetDocumentsByAuthorID(authorID, page, pageSize int) ([]*DocumentResponse, int64, error) {
offset := (page - 1) * pageSize
documents, total, err := s.documentDAO.GetByAuthorID(fmt.Sprintf("%d", authorID), offset, pageSize)
if err != nil {
return nil, 0, err
}
responses := make([]*DocumentResponse, len(documents))
for i, doc := range documents {
responses[i] = s.toResponse(doc)
}
return responses, total, nil
}
func (s *DocumentService) ListIngestionTasks(userID string, datasetID *string, page, pageSize int) ([]*entity.IngestionTask, error) {
offset := (page - 1) * pageSize
var tasks []*entity.IngestionTask
var err error
if datasetID == nil {
tasks, err = s.ingestionTaskDAO.ListByUserID(userID, offset, pageSize)
} else {
tasks, err = s.ingestionTaskDAO.ListByUserIDAndDatasetID(userID, *datasetID, offset, pageSize)
}
if err != nil {
return nil, err
}
return tasks, nil
}
type ParseDocumentResponse struct {
DocumentID string `json:"document_id"`
Result string `json:"result"`
}
func (s *DocumentService) IngestDocuments(datasetID, userID string, docIDs []string) ([]*ParseDocumentResponse, error) {
// deduplicate the document id
uniqueDocIDs := common.Deduplicate(docIDs)
if uniqueDocIDs == nil || len(uniqueDocIDs) == 0 {
return nil, fmt.Errorf("no documents to parse")
}
var responses []*ParseDocumentResponse
// query database, if the document ids are valid
for _, docID := range uniqueDocIDs {
doc, err := s.documentDAO.GetByID(docID)
if err != nil {
errorMessage := err.Error()
responses = append(responses, &ParseDocumentResponse{
DocumentID: docID,
Result: errorMessage,
})
continue
}
if doc == nil {
errorMessage := "no such document"
responses = append(responses, &ParseDocumentResponse{
DocumentID: docID,
Result: errorMessage,
})
continue
}
task := &entity.IngestionTask{
DocumentID: docID,
UserID: userID,
DatasetID: datasetID,
Schema: nil,
Status: common.CREATED,
}
// save the task to database
task, err = s.ingestionTaskDAO.CheckAndCreate(task)
if err != nil {
errorMessage := err.Error()
responses = append(responses, &ParseDocumentResponse{
DocumentID: docID,
Result: errorMessage,
})
continue
}
msgQueueEngine := engine.GetMessageQueueEngine()
taskMessage := common.TaskMessage{
TaskID: task.ID,
TaskType: common.TaskTypeIngestionTask,
}
// convert task
taskMessageStr, err := json.Marshal(taskMessage)
if err != nil {
return nil, err
}
err = msgQueueEngine.PublishTask("tasks.RAGFLOW", taskMessageStr)
if err != nil {
return nil, err
}
responses = append(responses, &ParseDocumentResponse{
DocumentID: docID,
Result: fmt.Sprintf("task_id: %s", task.ID),
})
}
common.Info(fmt.Sprintf("parse documents, dataset: %s, documents: %v", datasetID, docIDs))
return responses, nil
}
func (s *DocumentService) StopIngestionTasks(tasks []string, userID string) ([]*entity.IngestionTask, error) {
var taskResponses []*entity.IngestionTask
for _, taskID := range tasks {
task, err := s.ingestionTaskDAO.SetStoppingByAPIServer(taskID)
if err != nil {
return nil, err
}
taskResponses = append(taskResponses, task)
}
return taskResponses, nil
}
func (s *DocumentService) RemoveIngestionTasks(tasks []string, userID string) ([]map[string]string, error) {
var deletedTasks []map[string]string
for _, taskID := range tasks {
taskRecord := map[string]string{
"task_id": taskID,
}
_, err := s.ingestionTaskDAO.RemoveByAPIServerOrAdminServer(taskID, &userID)
if err != nil {
taskRecord["remove"] = fmt.Sprintf("fail: %s", err.Error())
} else {
taskRecord["remove"] = "success"
}
deletedTasks = append(deletedTasks, taskRecord)
}
return deletedTasks, nil
}
func (s *DocumentService) ParseDocuments(datasetID, userID string, docIDs []string) ([]*ParseDocumentResponse, error) {
// create document parse id
// save to task table
// send to message queue
// deduplicate the document id
uniqueDocIDs := common.Deduplicate(docIDs)
if uniqueDocIDs == nil || len(uniqueDocIDs) == 0 {
return nil, fmt.Errorf("no documents to parse")
}
var responses []*ParseDocumentResponse
// query database, if the document ids are valid
for _, docID := range uniqueDocIDs {
doc, err := s.documentDAO.GetByID(docID)
if err != nil {
errorMessage := err.Error()
responses = append(responses, &ParseDocumentResponse{
DocumentID: docID,
Result: errorMessage,
})
continue
}
if doc == nil {
errorMessage := "no such document"
responses = append(responses, &ParseDocumentResponse{
DocumentID: docID,
Result: errorMessage,
})
continue
}
if doc.Status != nil && *doc.Status != "0" {
errorMessage := fmt.Sprintf("document %s is already parsed", docID)
responses = append(responses, &ParseDocumentResponse{
DocumentID: docID,
Result: errorMessage,
})
continue
}
// create task for each document
//task := &entity.IngestionTask{
// ID: utility.GenerateToken(),
// DocumentID: docID,
// UserID: userID,
//}
// save the task to database
//err = s.ingestionTaskDAO.Create(task)
//if err != nil {
// errorMessage := err.Error()
// responses = append(responses, &ParseDocumentResponse{
// DocumentID: docID,
// Result: &errorMessage,
// })
// continue
//}
// Send task to message queue
}
common.Info(fmt.Sprintf("parse documents, dataset: %s, documents: %v", datasetID, docIDs))
return responses, nil
}
// StopParseDocuments stops parsing for the given documents in a dataset.
// It sets Redis cancel signals for associated tasks and updates doc.run to CANCEL.
// Returns a map with success_count and optionally errors.
func (s *DocumentService) StopParseDocuments(datasetID string, docIDs []string) (map[string]interface{}, error) {
deduped := common.Deduplicate(docIDs)
if len(deduped) == 0 {
return nil, fmt.Errorf("no document IDs provided")
}
docs, err := s.validateDocsInDataset(deduped, datasetID)
if err != nil {
return nil, err
}
var errors []string
successCount := 0
for _, doc := range docs {
if cancelErr := s.cancelDocParse(doc); cancelErr != nil {
errors = append(errors, cancelErr.Error())
continue
}
successCount++
}
result := map[string]interface{}{"success_count": successCount}
if len(errors) > 0 {
result["errors"] = errors
}
return result, nil
}
// validateDocsInDataset deduplicates IDs, fetches the documents, and ensures
// every document exists and belongs to the given dataset. Returns the resolved
// documents.
func (s *DocumentService) validateDocsInDataset(docIDs []string, datasetID string) ([]*entity.Document, error) {
docs, err := s.documentDAO.GetByIDs(docIDs)
if err != nil {
return nil, fmt.Errorf("failed to fetch documents: %w", err)
}
if len(docs) != len(docIDs) {
return nil, fmt.Errorf("some document IDs not found in dataset %s", datasetID)
}
var invalid []string
for _, d := range docs {
if d.KbID != datasetID {
invalid = append(invalid, d.ID)
}
}
if len(invalid) > 0 {
return nil, fmt.Errorf("these documents do not belong to dataset %s: %v", datasetID, invalid)
}
return docs, nil
}
// cancelDocParse sets Redis cancel signals for the document's active tasks and
// marks the document run status as CANCEL. Returns an error if the document is
// not in a cancellable state or the status update fails.
func (s *DocumentService) cancelDocParse(doc *entity.Document) error {
tasks, taskErr := s.taskDAO.GetByDocID(doc.ID)
if taskErr != nil {
return fmt.Errorf("failed to get tasks for %s: %v", doc.ID, taskErr)
}
hasUnfinishedTask := false
for _, t := range tasks {
if t.Progress < 1 {
hasUnfinishedTask = true
break
}
}
canCancel := false
if doc.Run != nil {
if *doc.Run == string(entity.TaskStatusRunning) || *doc.Run == string(entity.TaskStatusCancel) {
canCancel = true
}
}
if hasUnfinishedTask {
canCancel = true
}
if !canCancel {
return fmt.Errorf("can't stop parsing document that has not started or already completed")
}
// Set Redis cancel signal for each task (best-effort)
redisClient := redis.Get()
for _, t := range tasks {
if redisClient != nil {
redisClient.Set(fmt.Sprintf("%s-cancel", t.ID), "x", 0)
}
}
if upErr := s.documentDAO.UpdateByID(doc.ID, map[string]interface{}{"run": string(entity.TaskStatusCancel)}); upErr != nil {
return fmt.Errorf("failed to update document %s: %v", doc.ID, upErr)
}
return nil
}
// toResponse convert model.Document to DocumentResponse
func (s *DocumentService) toResponse(doc *entity.Document) *DocumentResponse {
createdAt := ""
if doc.CreateTime != nil {
// Check if timestamp is in milliseconds (13 digits) or seconds (10 digits)
var ts int64
if *doc.CreateTime > 1000000000000 {
// Milliseconds - convert to seconds
ts = *doc.CreateTime / 1000
} else {
ts = *doc.CreateTime
}
createdAt = time.Unix(ts, 0).Format("2006-01-02 15:04:05")
}
updatedAt := ""
if doc.UpdateTime != nil {
// Accept both historical second-based values and current millisecond-based values.
ts := *doc.UpdateTime
if ts > 1000000000000 {
ts /= 1000
}
updatedAt = time.Unix(ts, 0).Format("2006-01-02 15:04:05")
}
return &DocumentResponse{
ID: doc.ID,
Name: doc.Name,
KbID: doc.KbID,
ParserID: doc.ParserID,
PipelineID: doc.PipelineID,
Type: doc.Type,
SourceType: doc.SourceType,
CreatedBy: doc.CreatedBy,
Location: doc.Location,
Size: doc.Size,
TokenNum: doc.TokenNum,
ChunkNum: doc.ChunkNum,
Progress: doc.Progress,
ProgressMsg: doc.ProgressMsg,
ProcessDuration: doc.ProcessDuration,
Suffix: doc.Suffix,
Run: doc.Run,
Status: doc.Status,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
}
}
// GetMetadataSummaryRequest request for metadata summary
type GetMetadataSummaryRequest struct {
KBID string `json:"kb_id" binding:"required"`
DocIDs []string `json:"doc_ids"`
}
// GetMetadataSummaryResponse response for metadata summary
type GetMetadataSummaryResponse struct {
Summary map[string]interface{} `json:"summary"`
}
// GetMetadataSummary get metadata summary for documents
func (s *DocumentService) GetMetadataSummary(kbID string, docIDs []string) (map[string]interface{}, error) {
tenantID, err := s.metadataSvc.GetTenantIDByKBID(kbID)
if err != nil {
return nil, err
}
searchResult, err := s.metadataSvc.SearchMetadata(kbID, tenantID, docIDs, 1000)
if err != nil {
return nil, err
}
// Aggregate metadata from results
return aggregateMetadata(searchResult.MetadataRecords), nil
}
// SetDocumentMetadata sets metadata for a document in the document engine
func (s *DocumentService) SetDocumentMetadata(docID string, meta map[string]interface{}) error {
// Get document to find kb_id
doc, err := s.documentDAO.GetByID(docID)
if err != nil {
return fmt.Errorf("document not found: %w", err)
}
// Get tenant ID
tenantID, err := s.metadataSvc.GetTenantIDByKBID(doc.KbID)
if err != nil {
return fmt.Errorf("failed to get tenant ID: %w", err)
}
// Update metadata using the document engine (merges with existing)
err = s.docEngine.UpdateMetadata(nil, docID, doc.KbID, meta, tenantID)
if err != nil {
return fmt.Errorf("failed to update metadata: %w", err)
}
return nil
}
// DeleteDocumentMetadata deletes metadata keys for a document in the document engine
func (s *DocumentService) DeleteDocumentMetadata(docID string, keys []string) error {
// Get document to find kb_id
doc, err := s.documentDAO.GetByID(docID)
if err != nil {
return fmt.Errorf("document not found: %w", err)
}
// Get tenant ID
tenantID, err := s.metadataSvc.GetTenantIDByKBID(doc.KbID)
if err != nil {
return fmt.Errorf("failed to get tenant ID: %w", err)
}
// Delete metadata using the document engine
err = s.docEngine.DeleteMetadataKeys(nil, docID, doc.KbID, keys, tenantID)
if err != nil {
return fmt.Errorf("failed to delete metadata: %w", err)
}
return nil
}
// DeleteDocumentAllMetadata deletes all metadata for a document in the document engine
func (s *DocumentService) DeleteDocumentAllMetadata(docID string) error {
// Get document to find kb_id
doc, err := s.documentDAO.GetByID(docID)
if err != nil {
return fmt.Errorf("document not found: %w", err)
}
// Get tenant ID
tenantID, err := s.metadataSvc.GetTenantIDByKBID(doc.KbID)
if err != nil {
return fmt.Errorf("failed to get tenant ID: %w", err)
}
// Build condition to match the document
condition := map[string]interface{}{
"id": docID,
"kb_id": doc.KbID,
}
// Delete entire document metadata
_, err = s.docEngine.DeleteMetadata(nil, condition, tenantID)
if err != nil {
return fmt.Errorf("failed to delete document metadata: %w", err)
}
return nil
}
// GetDocumentMetadataByID get metadata for a specific document
func (s *DocumentService) GetDocumentMetadataByID(docID string) (map[string]interface{}, error) {
// Get document to find kb_id
doc, err := s.documentDAO.GetByID(docID)
if err != nil {
return nil, fmt.Errorf("document not found: %w", err)
}
tenantID, err := s.metadataSvc.GetTenantIDByKBID(doc.KbID)
if err != nil {
return nil, err
}
searchResult, err := s.metadataSvc.SearchMetadata(doc.KbID, tenantID, []string{docID}, 1)
if err != nil {
return nil, err
}
// Return metadata if found
if len(searchResult.MetadataRecords) > 0 {
metadata := searchResult.MetadataRecords[0]
return ExtractMetaFields(metadata)
}
return make(map[string]interface{}), nil
}
// GetMetadataByKBs get metadata for knowledge bases
func (s *DocumentService) GetMetadataByKBs(kbIDs []string) (map[string]interface{}, error) {
if len(kbIDs) == 0 {
return make(map[string]interface{}), nil
}
searchResult, err := s.metadataSvc.SearchMetadataByKBs(kbIDs, 10000)
if err != nil {
return nil, err
}
flattenedMeta := make(map[string]map[string][]string)
numMetadata := len(searchResult.MetadataRecords)
var allMetaFields []map[string]interface{}
if numMetadata > 1 && len(searchResult.MetadataRecords) > 0 {
firstMetadata := searchResult.MetadataRecords[0]
if metaFieldsVal := firstMetadata["meta_fields"]; metaFieldsVal != nil {
if v, ok := metaFieldsVal.([]byte); ok {
allMetaFields = ParseAllLengthPrefixedJSON(v)
}
}
}
for idx, metadata := range searchResult.MetadataRecords {
docID, ok := ExtractDocumentID(metadata)
if !ok {
continue
}
var metaFields map[string]interface{}
var metaFieldsVal interface{}
if len(allMetaFields) > 0 && idx < len(allMetaFields) {
// Use pre-parsed meta_fields from concatenated data
metaFields = allMetaFields[idx]
} else {
// Normal case - get from chunk
metaFieldsVal = metadata["meta_fields"]
if metaFieldsVal != nil {
switch v := metaFieldsVal.(type) {
case string:
if err := json.Unmarshal([]byte(v), &metaFields); err != nil {
continue
}
case []byte:
// Try direct JSON parse first
if err := json.Unmarshal(v, &metaFields); err != nil {
// Try to parse as concatenated JSON objects
metaFields = ParseLengthPrefixedJSON(v)
}
case map[string]interface{}:
metaFields = v
default:
continue
}
}
}
if metaFields == nil {
continue
}
// Process each metadata field
for fieldName, fieldValue := range metaFields {
if fieldName == "kb_id" || fieldName == "id" {
continue
}
if _, ok := flattenedMeta[fieldName]; !ok {
flattenedMeta[fieldName] = make(map[string][]string)
}
// Handle list and single values
var values []interface{}
switch v := fieldValue.(type) {
case []interface{}:
values = v
default:
values = []interface{}{v}
}
for _, val := range values {
if val == nil {
continue
}
strVal := fmt.Sprintf("%v", val)
flattenedMeta[fieldName][strVal] = append(flattenedMeta[fieldName][strVal], docID)
}
}
}
// Convert to map[string]interface{} for return
var metaResult map[string]interface{} = make(map[string]interface{})
for k, v := range flattenedMeta {
metaResult[k] = v
}
return metaResult, nil
}
// valueInfo holds count and order of first appearance
type valueInfo struct {
count int
firstOrder int
}
// aggregateMetadata aggregates metadata from search results
func aggregateMetadata(chunks []map[string]interface{}) map[string]interface{} {
// summary: map[fieldName]map[value]valueInfo
summary := make(map[string]map[string]valueInfo)
typeCounter := make(map[string]map[string]int)
orderCounter := 0
for _, chunk := range chunks {
// For metadata table, the actual metadata is in the "meta_fields" JSON field
// Extract it first
metaFieldsVal := chunk["meta_fields"]
if metaFieldsVal == nil {
continue
}
// Parse meta_fields - could be a string (JSON) or a map
var metaFields map[string]interface{}
switch v := metaFieldsVal.(type) {
case string:
// Parse JSON string
if err := json.Unmarshal([]byte(v), &metaFields); err != nil {
continue
}
case []byte:
// Handle byte slice - Infinity returns concatenated JSON objects with length prefixes
rawBytes := v
// Try to detect and handle length-prefixed format
// Format: [4-byte length][JSON][4-byte length][JSON]...
parsedMetaFields := make(map[string]interface{})
offset := 0
for offset < len(rawBytes) {
// Need at least 4 bytes for length prefix
if offset+4 > len(rawBytes) {
break
}
// Read 4-byte length (little-endian, not big-endian!)
length := uint32(rawBytes[offset]) | uint32(rawBytes[offset+1])<<8 |
uint32(rawBytes[offset+2])<<16 | uint32(rawBytes[offset+3])<<24
// Check if length looks valid (not too large)
if length > 10000 || length == 0 {
// Try to find next '{' from current position
nextBrace := -1
for i := offset; i < len(rawBytes) && i < offset+100; i++ {
if rawBytes[i] == '{' {
nextBrace = i
break
}
}
if nextBrace > offset {
// Skip to the next '{'
offset = nextBrace
continue
}
break
}
// Extract JSON data
jsonStart := offset + 4
jsonEnd := jsonStart + int(length)
if jsonEnd > len(rawBytes) {
jsonEnd = len(rawBytes)
}
jsonBytes := rawBytes[jsonStart:jsonEnd]
// Try to parse this JSON
var singleMeta map[string]interface{}
if err := json.Unmarshal(jsonBytes, &singleMeta); err == nil {
// Merge metadata from this document
for k, vv := range singleMeta {
if existing, ok := parsedMetaFields[k]; ok {
// Combine values
if existList, ok := existing.([]interface{}); ok {
if newList, ok := vv.([]interface{}); ok {
parsedMetaFields[k] = append(existList, newList...)
} else {
parsedMetaFields[k] = append(existList, vv)
}
} else {
parsedMetaFields[k] = []interface{}{existing, vv}
}
} else {
parsedMetaFields[k] = vv
}
}
}
offset = jsonEnd
}
// If we successfully parsed multiple JSON objects, use the merged result
if len(parsedMetaFields) > 0 {
metaFields = parsedMetaFields
} else {
// Fallback: try the original parsing method
startIdx := -1
for i, b := range rawBytes {
if b == '{' {
startIdx = i
break
}
}
if startIdx > 0 {
strVal := string(rawBytes[startIdx:])
if err := json.Unmarshal([]byte(strVal), &metaFields); err != nil {
metaFields = map[string]interface{}{"raw": strVal}
}
} else if err := json.Unmarshal(rawBytes, &metaFields); err != nil {
metaFields = map[string]interface{}{"raw": string(rawBytes)}
}
}
case map[string]interface{}:
metaFields = v
default:
continue
}
// Now iterate over the extracted metadata fields
for k, v := range metaFields {
// Skip nil values
if v == nil {
continue
}
// Determine value type
valueType := getMetaValueType(v)
// Track type counts
if valueType != "" {
if _, ok := typeCounter[k]; !ok {
typeCounter[k] = make(map[string]int)
}
typeCounter[k][valueType] = typeCounter[k][valueType] + 1
}
// Aggregate value counts
values := v
if v, ok := v.([]interface{}); ok {
values = v
} else {
values = []interface{}{v}
}
for _, vv := range values.([]interface{}) {
if vv == nil {
continue
}
sv := fmt.Sprintf("%v", vv)
if _, ok := summary[k]; !ok {
summary[k] = make(map[string]valueInfo)
}
if existing, ok := summary[k][sv]; ok {
// Already exists, just increment count
existing.count++
summary[k][sv] = existing
} else {
// First time seeing this value - record order
summary[k][sv] = valueInfo{count: 1, firstOrder: orderCounter}
orderCounter++
}
}
}
}
// Build result with type information and sorted values
result := make(map[string]interface{})
for k, v := range summary {
// Sort by count descending, then by firstOrder ascending (to match Python stable sort)
// values: [value, count, firstOrder]
values := make([][3]interface{}, 0, len(v))
for val, info := range v {
values = append(values, [3]interface{}{val, info.count, info.firstOrder})
}
// Use stable sort - sort by count descending, then by firstOrder
sort.SliceStable(values, func(i, j int) bool {
cntI := values[i][1].(int)
cntJ := values[j][1].(int)
if cntI != cntJ {
return cntI > cntJ // count descending
}
// If counts equal, use firstOrder ascending (earlier appearance first)
return values[i][2].(int) < values[j][2].(int)
})
// Determine dominant type
valueType := "string"
if typeCounts, ok := typeCounter[k]; ok {
maxCount := 0
for t, c := range typeCounts {
if c > maxCount {
maxCount = c
valueType = t
}
}
}
// Convert from [value, count, firstOrder] to [value, count] for output
outputValues := make([][2]interface{}, len(values))
for i, val := range values {
outputValues[i] = [2]interface{}{val[0], val[1]}
}
result[k] = map[string]interface{}{
"type": valueType,
"values": outputValues,
}
}
return result
}
// getMetaValueType determines the type of a metadata value
func getMetaValueType(value interface{}) string {
if value == nil {
return ""
}
switch v := value.(type) {
case []interface{}:
if len(v) > 0 {
return "list"
}
return ""
case bool:
return "string"
case int, int8, int16, int32, int64:
return "number"
case float32, float64:
return "number"
case string:
if isTimeString(v) {
return "time"
}
return "string"
}
return "string"
}
// isTimeString checks if a string is an ISO 8601 datetime
func isTimeString(s string) bool {
matched, _ := regexp.MatchString(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$`, s)
return matched
}
func (s *DocumentService) UpdateDatasetDocument(userID, datasetID, documentID string, req *UpdateDatasetDocumentRequest, present map[string]bool) (*UpdateDatasetDocumentResponse, common.ErrorCode, error) {
tenantID := userID
kb, err := s.kbDAO.GetByIDAndTenantID(datasetID, tenantID)
if err != nil {
if dao.IsNotFoundErr(err) {
return nil, common.CodeDataError, errors.New("You don't own the dataset.")
}
return nil, common.CodeDataError, errors.New("Can't find this dataset!")
}
doc, err := s.documentDAO.GetByDocumentIDAndDatasetID(documentID, datasetID)
if err != nil {
if dao.IsNotFoundErr(err) {
return nil, common.CodeDataError, errors.New("The dataset doesn't own the document.")
}
return nil, common.CodeServerError, err
}
if code, err := s.validateDatasetDocumentUpdate(doc, req, present); err != nil {
return nil, code, err
}
if present["meta_fields"] {
if err := s.replaceDocumentMetadata(documentID, req.MetaFields); err != nil {
return nil, common.CodeDataError, err
}
}
if present["name"] && req.Name != nil && (doc.Name == nil || *req.Name != *doc.Name) {
if err := s.updateDocumentNameOnly(doc, kb.TenantID, *req.Name); err != nil {
return nil, common.CodeDataError, err
}
}
if present["parser_config"] && req.ParserConfig != nil {
if err := s.updateDocumentParserConfig(doc.ID, req.ParserConfig); err != nil {
return nil, common.CodeDataError, err
}
}
if req.PipelineID != nil && *req.PipelineID != "" {
if err := s.resetDocumentForReparse(doc, kb.TenantID, nil, req.PipelineID); err != nil {
return nil, common.CodeDataError, err
}
} else if present["parser_id"] && req.ParserID != nil && strings.TrimSpace(*req.ParserID) != "" {
parserID := strings.TrimSpace(*req.ParserID)
if err := s.resetDocumentForReparse(doc, kb.TenantID, &parserID, nil); err != nil {
return nil, common.CodeDataError, err
}
} else if req.ChunkMethod != nil && *req.ChunkMethod != "" {
if err := s.updateChunkMethod(doc, kb.TenantID, *req.ChunkMethod, req.ParserConfig, present["parser_config"]); err != nil {
return nil, common.CodeDataError, err
}
}
if present["enabled"] && req.Enabled != nil {
if err := s.updateDocumentStatusOnly(doc, kb, *req.Enabled); err != nil {
return nil, common.CodeServerError, err
}
}
updatedDoc, err := s.documentDAO.GetByID(doc.ID)
if err != nil {
if dao.IsNotFoundErr(err) {
return nil, common.CodeDataError, fmt.Errorf("Can not get document by id:%s", doc.ID)
}
return nil, common.CodeDataError, errors.New("Database operation failed")
}
metaFields := map[string]interface{}{}
if s.docEngine != nil && s.metadataSvc != nil {
metaFields, _ = s.GetDocumentMetadataByID(updatedDoc.ID)
}
return s.toUpdateDatasetDocumentResponse(updatedDoc, metaFields), common.CodeSuccess, nil
}
var allowedDocumentChunkMethods = map[string]struct{}{
"naive": {},
"manual": {},
"qa": {},
"table": {},
"paper": {},
"book": {},
"laws": {},
"presentation": {},
"picture": {},
"one": {},
"knowledge_graph": {},
"email": {},
"tag": {},
}
func (s *DocumentService) validateDatasetDocumentUpdate(doc *entity.Document, req *UpdateDatasetDocumentRequest, present map[string]bool) (common.ErrorCode, error) {
if req == nil {
return common.CodeDataError, errors.New("Invalid request payload")
}
if present["chunk_count"] && req.ChunkCount != nil && *req.ChunkCount != 0 && *req.ChunkCount != doc.ChunkNum {
return common.CodeDataError, errors.New("Can't change `chunk_count`.")
}
if present["token_count"] && req.TokenCount != nil && *req.TokenCount != 0 && *req.TokenCount != doc.TokenNum {
return common.CodeDataError, errors.New("Can't change `token_count`.")
}
if present["progress"] && req.Progress != nil && *req.Progress != 0 && math.Abs(*req.Progress-doc.Progress) > 1e-9 {
return common.CodeDataError, errors.New("Can't change `progress`.")
}
if present["enabled"] {
if req.Enabled == nil || (*req.Enabled != 0 && *req.Enabled != 1) {
return common.CodeDataError, errors.New("`enabled` value invalid, only accept 0 or 1")
}
}
if present["chunk_method"] {
if req.ChunkMethod == nil || strings.TrimSpace(*req.ChunkMethod) == "" {
return common.CodeDataError, errors.New("`chunk_method` (empty string) is not valid")
}
chunkMethod := strings.TrimSpace(*req.ChunkMethod)
if _, ok := allowedDocumentChunkMethods[chunkMethod]; !ok {
return common.CodeDataError, fmt.Errorf("`chunk_method` %s doesn't exist", chunkMethod)
}
if doc.Type == "visual" || isPresentationFile(doc.Name) {
return common.CodeDataError, errors.New("Not supported yet!")
}
}
if present["parser_id"] && req.ParserID != nil {
parserID := strings.TrimSpace(*req.ParserID)
if (doc.Type == "visual" && parserID != "picture") || (isPresentationFile(doc.Name) && parserID != "presentation") {
return common.CodeDataError, errors.New("Not supported yet!")
}
}
if present["name"] && req.Name != nil {
if err := s.validateDocumentName(doc, *req.Name); err != nil {
return common.CodeDataError, err
}
}
if present["meta_fields"] {
if err := validateMetaFields(req.MetaFields); err != nil {
return common.CodeDataError, err
}
}
return common.CodeSuccess, nil
}
func (s *DocumentService) validateDocumentName(doc *entity.Document, newName string) error {
if strings.TrimSpace(newName) == "" {
return errors.New("File name can't be empty.")
}
if len([]byte(newName)) > 255 {
return errors.New("File name must be 255 bytes or less.")
}
oldName := ""
if doc.Name != nil {
oldName = *doc.Name
}
if strings.ToLower(filepath.Ext(newName)) != strings.ToLower(filepath.Ext(oldName)) {
return errors.New("The extension of file can't be changed")
}
docs, err := s.documentDAO.GetByNameAndKBID(newName, doc.KbID)
if err != nil {
return err
}
for _, d := range docs {
if d.ID != doc.ID && d.Name != nil && *d.Name == newName {
return errors.New("Duplicated document name in the same dataset.")
}
}
return nil
}
func isPresentationFile(name *string) bool {
if name == nil {
return false
}
ext := strings.ToLower(filepath.Ext(*name))
return ext == ".ppt" || ext == ".pptx" || ext == ".pages"
}
func validateMetaFields(meta map[string]any) error {
if meta == nil {
return nil
}
for _, v := range meta {
switch typed := v.(type) {
case string, float64, int, int64, float32:
continue
case []any:
for _, item := range typed {
switch item.(type) {
case string, float64, int, int64, float32:
continue
default:
return fmt.Errorf("The type is not supported in list: %v", typed)
}
}
default:
return fmt.Errorf("The type is not supported: %v", v)
}
}
return nil
}
func (s *DocumentService) replaceDocumentMetadata(docID string, meta map[string]any) error {
if s.docEngine == nil || s.metadataSvc == nil {
return nil
}
if err := s.DeleteDocumentAllMetadata(docID); err != nil {
return err
}
return s.SetDocumentMetadata(docID, map[string]interface{}(meta))
}
func (s *DocumentService) updateDocumentNameOnly(doc *entity.Document, tenantID, newName string) error {
if err := s.documentDAO.UpdateByID(doc.ID, map[string]interface{}{"name": newName}); err != nil {
return errors.New("Database error (Document rename)!")
}
mappings, err := s.file2DocumentDAO.GetByDocumentID(doc.ID)
if err == nil && len(mappings) > 0 && mappings[0].FileID != nil && s.fileDAO != nil {
_ = s.fileDAO.UpdateByID(*mappings[0].FileID, map[string]interface{}{"name": newName})
}
if s.docEngine == nil {
return nil
}
titleTks, _ := tokenizer.Tokenize(newName)
titleSmTks, _ := tokenizer.FineGrainedTokenize(titleTks)
indexName := fmt.Sprintf("ragflow_%s", tenantID)
return s.docEngine.UpdateChunks(
context.Background(),
map[string]interface{}{"doc_id": doc.ID},
map[string]interface{}{
"docnm_kwd": newName,
"title_tks": titleTks,
"title_sm_tks": titleSmTks,
},
indexName,
doc.KbID,
)
}
func (s *DocumentService) updateDocumentParserConfig(documentID string, config map[string]any) error {
if len(config) == 0 {
return nil
}
doc, err := s.documentDAO.GetByID(documentID)
if err != nil {
return fmt.Errorf("Document(%s) not found.", documentID)
}
merged := common.DeepMergeMaps(map[string]interface{}(doc.ParserConfig), map[string]interface{}(config))
if _, ok := config["raptor"]; !ok {
delete(merged, "raptor")
}
return s.documentDAO.UpdateByID(documentID, map[string]interface{}{
"parser_config": entity.JSONMap(merged),
})
}
func (s *DocumentService) resetDocumentForReparse(doc *entity.Document, tenantID string, parserID *string, pipelineID *string) error {
progressMsg := ""
run := string(entity.TaskStatusUnstart)
updates := map[string]interface{}{
"progress": 0,
"progress_msg": progressMsg,
"run": run,
}
if parserID != nil {
updates["parser_id"] = *parserID
}
if pipelineID != nil {
updates["pipeline_id"] = *pipelineID
}
if err := s.documentDAO.UpdateByID(doc.ID, updates); err != nil {
return errors.New("Document not found!")
}
if doc.TokenNum > 0 {
decremented, err := s.decrementDocumentAndKBCountersForReparse(doc)
if err != nil {
return errors.New("Document not found!")
}
if !decremented {
return nil
}
if s.docEngine != nil {
indexName := fmt.Sprintf("ragflow_%s", tenantID)
s.deleteChunkImages(doc, indexName)
if _, err := s.docEngine.DeleteChunks(context.Background(), map[string]interface{}{"doc_id": doc.ID}, indexName, doc.KbID); err != nil {
return err
}
}
}
return nil
}
func (s *DocumentService) deleteChunkImages(doc *entity.Document, indexName string) {
if s.docEngine == nil {
return
}
storageImpl := storage.GetStorageFactory().GetStorage()
if storageImpl == nil {
return
}
const pageSize = 1000
for offset := 0; ; offset += pageSize {
result, err := s.docEngine.Search(context.Background(), &enginetypes.SearchRequest{
IndexNames: []string{indexName},
KbIDs: []string{doc.KbID},
Offset: offset,
Limit: pageSize,
SelectFields: []string{"id", "img_id"},
Filter: map[string]interface{}{"doc_id": doc.ID},
MatchExprs: nil,
OrderBy: nil,
RankFeature: nil,
})
if err != nil || result == nil || len(result.Chunks) == 0 {
return
}
for _, chunk := range result.Chunks {
imageKey, ok := chunkImageStorageKey(doc.KbID, chunk)
if !ok {
continue
}
if storageImpl.ObjExist(doc.KbID, imageKey) {
_ = storageImpl.Remove(doc.KbID, imageKey)
}
}
}
}
func chunkImageStorageKey(defaultBucket string, chunk map[string]interface{}) (string, bool) {
imgID := firstStringField(chunk, "img_id")
if imgID != "" {
prefix := defaultBucket + "-"
if strings.HasPrefix(imgID, prefix) && len(imgID) > len(prefix) {
return strings.TrimPrefix(imgID, prefix), true
}
return imgID, true
}
chunkID := firstStringField(chunk, "id", "_id")
if chunkID == "" {
return "", false
}
return chunkID, true
}
func firstStringField(m map[string]interface{}, keys ...string) string {
for _, key := range keys {
if value, ok := m[key]; ok {
if s, ok := value.(string); ok {
return s
}
}
}
return ""
}
func (s *DocumentService) decrementDocumentAndKBCountersForReparse(doc *entity.Document) (bool, error) {
decremented := false
err := dao.DB.Transaction(func(tx *gorm.DB) error {
result := tx.Model(&entity.Document{}).
Where("id = ? AND kb_id = ? AND token_num = ? AND chunk_num = ?", doc.ID, doc.KbID, doc.TokenNum, doc.ChunkNum).
Updates(map[string]interface{}{
"token_num": gorm.Expr("token_num - ?", doc.TokenNum),
"chunk_num": gorm.Expr("chunk_num - ?", doc.ChunkNum),
"process_duration": gorm.Expr("process_duration - ?", doc.ProcessDuration),
})
if result.Error != nil {
return result.Error
}
if result.RowsAffected == 0 {
return nil
}
decremented = true
return tx.Model(&entity.Knowledgebase{}).
Where("id = ?", doc.KbID).
Updates(map[string]interface{}{
"token_num": gorm.Expr("token_num - ?", doc.TokenNum),
"chunk_num": gorm.Expr("chunk_num - ?", doc.ChunkNum),
}).Error
})
return decremented, err
}
func (s *DocumentService) updateChunkMethod(doc *entity.Document, tenantID string, chunkMethod string, parserConfig map[string]any, hasParserConfig bool) error {
chunkMethod = strings.TrimSpace(chunkMethod)
if !strings.EqualFold(doc.ParserID, chunkMethod) {
if err := s.resetDocumentForReparse(doc, tenantID, &chunkMethod, nil); err != nil {
return err
}
}
if !hasParserConfig {
defaultConfig := common.GetParserConfig(chunkMethod, nil)
if err := s.updateDocumentParserConfig(doc.ID, defaultConfig); err != nil {
return err
}
}
return nil
}
func (s *DocumentService) updateDocumentStatusOnly(doc *entity.Document, kb *entity.Knowledgebase, status int) error {
statusStr := strconv.Itoa(status)
if doc.Status != nil && *doc.Status == statusStr {
return nil
}
if err := s.documentDAO.UpdateByID(doc.ID, map[string]interface{}{"status": statusStr}); err != nil {
return errors.New("Database error (Document update)!")
}
if s.docEngine == nil {
return nil
}
indexName := fmt.Sprintf("ragflow_%s", kb.TenantID)
return s.docEngine.UpdateChunks(
context.Background(),
map[string]interface{}{"doc_id": doc.ID},
map[string]interface{}{"available_int": status},
indexName,
doc.KbID,
)
}
func (s *DocumentService) toUpdateDatasetDocumentResponse(doc *entity.Document, metaFields map[string]interface{}) *UpdateDatasetDocumentResponse {
if metaFields == nil {
metaFields = map[string]interface{}{}
}
return &UpdateDatasetDocumentResponse{
ID: doc.ID,
Thumbnail: doc.Thumbnail,
DatasetID: doc.KbID,
ChunkMethod: doc.ParserID,
PipelineID: doc.PipelineID,
ParserConfig: map[string]interface{}(doc.ParserConfig),
SourceType: doc.SourceType,
Type: doc.Type,
CreatedBy: doc.CreatedBy,
Name: doc.Name,
Location: doc.Location,
Size: doc.Size,
TokenCount: doc.TokenNum,
ChunkCount: doc.ChunkNum,
Progress: doc.Progress,
ProgressMsg: doc.ProgressMsg,
ProcessBeginAt: doc.ProcessBeginAt,
ProcessDuration: doc.ProcessDuration,
ContentHash: doc.ContentHash,
MetaFields: metaFields,
Suffix: doc.Suffix,
Run: mapDocumentRunStatus(doc.Run),
Status: doc.Status,
CreateTime: doc.CreateTime,
CreateDate: doc.CreateDate,
UpdateTime: doc.UpdateTime,
UpdateDate: doc.UpdateDate,
}
}
func mapDocumentRunStatus(run *string) string {
if run == nil {
return "UNSTART"
}
switch *run {
case string(entity.TaskStatusRunning):
return "RUNNING"
case string(entity.TaskStatusCancel):
return "CANCEL"
case string(entity.TaskStatusDone):
return "DONE"
case string(entity.TaskStatusFail):
return "FAIL"
default:
return "UNSTART"
}
}