feat[Go]: implement POST /api/v1/files/link-to-datasets (#15674)

### What problem does this PR solve?

Closes #15673 — ports the Python `file2document_api.py` `convert()`
endpoint to Go.

| Method | Path | Handler |
|--------|------|---------|
| POST | `/api/v1/files/link-to-datasets` | `FileHandler.LinkToDatasets`
|

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---

#### Implementation notes

**Files changed:**

```
internal/service/file2document.go  – new service (File2DocumentService)
internal/dao/file2document.go      – added Create method
internal/handler/file.go           – FileHandler gains file2DocumentService;
                                     LinkToDatasets HTTP handler
internal/router/router.go          – route registered
```

**Functional parity table:**

| Concern | Go behaviour |
|---------|-------------|
| Required fields | `file_ids` and `kb_ids` both required; missing
either → `CodeDataError` mirroring Python `@validate_request` |
| File existence | `fileDAO.GetByIDs(fileIDs)` builds a set; any missing
ID → `"File not found!"` |
| KB existence | `kbDAO.GetByID(kbID)` per KB; missing → `"Can't find
this dataset!"` |
| Folder expansion | `getAllInnermostFileIDs` recursively calls
`fileDAO.ListByParentID` — mirrors
`FileService.get_all_innermost_file_ids` |
| File permissions | `checkFileTeamPermission`: `file.TenantID ==
userID` OR user in tenant's team — mirrors `check_file_team_permission`
|
| KB permissions | `checkKBTeamPermission`: `kb.TenantID == userID` OR
user in tenant's team — mirrors `check_kb_team_permission` |
| Fire-and-forget | `go convertFiles(...)` goroutine after all
validation passes — mirrors `loop.run_in_executor(None, _convert_files,
…)` |
| Conversion | `convertFiles`: for each file → delete existing mappings
+ hard-delete old documents → create new `Document` in each target KB →
create `File2Document` mapping — mirrors Python `_convert_files` |
| `getParser` | Extension-based lookup with fallback to `kb.ParserID` —
mirrors `FileService.get_parser` |
| Immediate return | `true` returned to caller as soon as goroutine is
scheduled |

---------

Co-authored-by: Yingfeng <yingfeng.zhang@gmail.com>
This commit is contained in:
Hunnyboy1217
2026-06-10 01:46:55 -07:00
committed by GitHub
parent 3796835c4d
commit 16d5b4fa02
5 changed files with 460 additions and 4 deletions

View File

@@ -82,3 +82,8 @@ func (dao *File2DocumentDAO) GetByDocumentID(docID string) ([]*entity.File2Docum
func (dao *File2DocumentDAO) DeleteByDocumentID(docID string) error {
return DB.Unscoped().Where("document_id = ?", docID).Delete(&entity.File2Document{}).Error
}
// Create inserts a new file2document mapping record.
func (dao *File2DocumentDAO) Create(mapping *entity.File2Document) error {
return DB.Create(mapping).Error
}

View File

@@ -17,6 +17,8 @@
package handler
import (
"errors"
"fmt"
"net/http"
"net/url"
"ragflow/internal/common"
@@ -32,15 +34,17 @@ import (
// FileHandler file handler
type FileHandler struct {
fileService *service.FileService
userService *service.UserService
fileService *service.FileService
userService *service.UserService
file2DocumentService *service.File2DocumentService
}
// NewFileHandler create file handler
func NewFileHandler(fileService *service.FileService, userService *service.UserService) *FileHandler {
return &FileHandler{
fileService: fileService,
userService: userService,
fileService: fileService,
userService: userService,
file2DocumentService: service.NewFile2DocumentService(),
}
}
@@ -552,3 +556,69 @@ func (h *FileHandler) Download(c *gin.Context) {
// Send file data
c.Data(http.StatusOK, contentType, blob)
}
// LinkToDatasets links files (or folder trees) to one or more datasets.
// Mirrors Python POST /api/v1/files/link-to-datasets (convert).
// @Summary Link files to datasets
// @Description Associate files with target knowledge-base datasets, re-indexing
// as needed. Folder inputs are expanded to their innermost files.
// The heavy DB work runs in a goroutine; the endpoint returns immediately.
// @Tags file
// @Accept json
// @Produce json
// @Param request body service.LinkToDatasetsRequest true "file_ids and kb_ids"
// @Success 200 {object} map[string]interface{}
// @Router /api/v1/files/link-to-datasets [post]
func (h *FileHandler) LinkToDatasets(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
var req service.LinkToDatasetsRequest
// Tolerate bind errors: a malformed or empty body simply leaves the fields
// empty, which the validate_request-style check below reports as missing
// arguments — matching Python's @validate_request behaviour and code.
_ = c.ShouldBindJSON(&req)
// Mirror Python @validate_request("file_ids", "kb_ids"): missing arguments
// return ARGUMENT_ERROR (101) with data=null and the aggregated message.
var missing []string
if len(req.FileIDs) == 0 {
missing = append(missing, "file_ids")
}
if len(req.KbIDs) == 0 {
missing = append(missing, "kb_ids")
}
if len(missing) > 0 {
jsonError(c, common.CodeArgumentError, fmt.Sprintf("required argument are missing: %s; ", strings.Join(missing, ",")))
return
}
if err := h.file2DocumentService.LinkToDatasets(user.ID, &req); err != nil {
jsonError(c, linkToDatasetsErrorCode(err), err.Error())
return
}
c.JSON(http.StatusOK, gin.H{
"code": common.CodeSuccess,
"data": true,
"message": "success",
})
}
// linkToDatasetsErrorCode maps File2DocumentService sentinel errors to
// Python-compatible response codes. File/dataset-not-found and no-authorization
// use DATA_ERROR (102), matching Python's get_data_error_result in convert();
// any other (internal) error is reported as a server error.
func linkToDatasetsErrorCode(err error) common.ErrorCode {
switch {
case errors.Is(err, service.ErrLinkFileNotFound),
errors.Is(err, service.ErrLinkDatasetNotFound),
errors.Is(err, service.ErrLinkNoAuthorization):
return common.CodeDataError
default:
return common.CodeServerError
}
}

View File

@@ -285,6 +285,7 @@ func (r *Router) Setup(engine *gin.Engine) {
file.GET("", r.fileHandler.ListFiles)
file.DELETE("", r.fileHandler.DeleteFiles)
file.POST("/move", r.fileHandler.MoveFiles)
file.POST("/link-to-datasets", r.fileHandler.LinkToDatasets)
file.GET("/:id/ancestors", r.fileHandler.GetFileAncestors)
file.GET("/:id/parent", r.fileHandler.GetParentFolder)
file.GET("/:id", r.fileHandler.Download)

View File

@@ -492,6 +492,50 @@ func (s *DocumentService) deleteDocumentFull(docID string) error {
return nil
}
// RemoveDocumentKeepFile removes a document's chunks/metadata and the document
// row, decrementing the KB counters (doc_num/chunk_num/token_num), WITHOUT
// deleting the underlying file record, its storage blob, or its file2document
// mappings. Mirrors Python DocumentService.remove_document — the caller is
// responsible for cleaning up the file2document mappings separately.
func (s *DocumentService) RemoveDocumentKeepFile(docID string) error {
doc, kb, err := s.resolveDocAndKB(docID)
if err != nil {
return err
}
if _, delErr := s.taskDAO.DeleteByDocIDs([]string{docID}); delErr != nil {
common.Logger.Warn(fmt.Sprintf("RemoveDocumentKeepFile: failed to delete tasks for %s: %v", docID, delErr))
}
s.deleteDocEngineData(docID, kb.TenantID, doc.KbID)
return s.deleteDocRecordWithCounters(doc, kb.ID)
}
// InsertDocument creates a document row and increments the owning KB's doc_num
// counter in a single transaction. Mirrors Python DocumentService.insert, which
// updates dataset/document counters on insert. The document's ID and timestamps
// are populated by the caller / model hooks before insertion.
func (s *DocumentService) InsertDocument(doc *entity.Document) error {
return dao.DB.Transaction(func(tx *gorm.DB) error {
if err := tx.Create(doc).Error; err != nil {
return fmt.Errorf("failed to create document: %w", err)
}
// Guard the counter bump with RowsAffected: documents.kb_id has no DB-level
// FK, so Create can succeed against a non-existent KB and the Update would
// then report a nil error with 0 rows touched, silently desyncing doc_num.
// Roll the whole transaction back in that case (mirrors the counter checks
// in deleteDocRecordWithCounters).
result := tx.Model(&entity.Knowledgebase{}).
Where("id = ?", doc.KbID).
Update("doc_num", gorm.Expr("doc_num + 1"))
if result.Error != nil {
return fmt.Errorf("failed to increment doc_num for KB %s: %w", doc.KbID, result.Error)
}
if result.RowsAffected == 0 {
return fmt.Errorf("knowledgebase %s not found", doc.KbID)
}
return nil
})
}
// resolveDocAndKB loads the document and its knowledgebase, returning both or
// an error.
func (s *DocumentService) resolveDocAndKB(docID string) (*entity.Document, *entity.Knowledgebase, error) {

View File

@@ -0,0 +1,336 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package service
import (
"errors"
"path/filepath"
"strings"
"go.uber.org/zap"
"ragflow/internal/common"
"ragflow/internal/dao"
"ragflow/internal/entity"
)
// Sentinel errors returned by File2DocumentService. Handlers map these to
// Python-compatible response codes/messages. Returning sentinels (instead of
// wrapped DAO/runtime errors) prevents internal DB details from leaking through
// the API response path.
var (
// ErrLinkFileNotFound mirrors Python "File not found!".
ErrLinkFileNotFound = errors.New("File not found!")
// ErrLinkDatasetNotFound mirrors Python "Can't find this dataset!".
ErrLinkDatasetNotFound = errors.New("Can't find this dataset!")
// ErrLinkNoAuthorization mirrors Python "No authorization.".
ErrLinkNoAuthorization = errors.New("No authorization.")
// ErrLinkInternal is a generic, safe-to-expose internal failure.
ErrLinkInternal = errors.New("Internal server error.")
)
// File2DocumentService handles linking files to datasets.
type File2DocumentService struct {
fileDAO *dao.FileDAO
file2DocumentDAO *dao.File2DocumentDAO
kbDAO *dao.KnowledgebaseDAO
userTenantDAO *dao.UserTenantDAO
documentSvc *DocumentService
}
// NewFile2DocumentService creates a File2DocumentService.
func NewFile2DocumentService() *File2DocumentService {
return &File2DocumentService{
fileDAO: dao.NewFileDAO(),
file2DocumentDAO: dao.NewFile2DocumentDAO(),
kbDAO: dao.NewKnowledgebaseDAO(),
userTenantDAO: dao.NewUserTenantDAO(),
documentSvc: NewDocumentService(),
}
}
// LinkToDatasetsRequest is the body for POST /files/link-to-datasets.
type LinkToDatasetsRequest struct {
FileIDs []string `json:"file_ids"`
KbIDs []string `json:"kb_ids"`
}
// LinkToDatasets validates inputs, expands folders, checks permissions, and
// schedules convertFiles in a goroutine — mirroring Python convert().
// Returns immediately (fire-and-forget for the heavy DB work).
//
// On validation failure it returns a sentinel error (see ErrLink* above) so the
// handler can map it to a Python-compatible response without leaking internals.
func (s *File2DocumentService) LinkToDatasets(userID string, req *LinkToDatasetsRequest) error {
// ── 1. Validate files exist ───────────────────────────────────────────────
files, err := s.fileDAO.GetByIDs(req.FileIDs)
if err != nil {
common.Warn("LinkToDatasets: GetByIDs failed", zap.Error(err))
return ErrLinkInternal
}
filesSet := make(map[string]*entity.File, len(files))
for _, f := range files {
filesSet[f.ID] = f
}
for _, id := range req.FileIDs {
if filesSet[id] == nil {
return ErrLinkFileNotFound
}
}
// ── 2. Validate KBs exist ────────────────────────────────────────────────
kbMap := make(map[string]*entity.Knowledgebase, len(req.KbIDs))
for _, kbID := range req.KbIDs {
kb, err := s.kbDAO.GetByID(kbID)
if err != nil || kb == nil {
return ErrLinkDatasetNotFound
}
kbMap[kbID] = kb
}
// ── 3. Expand folders to leaf files, then deduplicate ─────────────────────
// Mixed folder + direct file inputs (or overlapping folders) can yield the
// same leaf file more than once; dedupe so each file is converted exactly
// once.
expanded := make([]string, 0, len(req.FileIDs))
for _, id := range req.FileIDs {
file := filesSet[id]
if file.Type == FileTypeFolder {
inner, err := s.getAllInnermostFileIDs(id)
if err != nil {
common.Warn("LinkToDatasets: folder expansion failed", zap.String("fileID", id), zap.Error(err))
return ErrLinkInternal
}
expanded = append(expanded, inner...)
} else {
expanded = append(expanded, id)
}
}
allFileIDs := dedupeStrings(expanded)
// ── 4. Validate expanded file permissions ─────────────────────────────────
for _, id := range allFileIDs {
file, err := s.fileDAO.GetByID(id)
if err != nil || file == nil {
return ErrLinkFileNotFound
}
if !s.checkFileTeamPermission(file, userID) {
return ErrLinkNoAuthorization
}
}
// ── 5. Validate KB permissions ────────────────────────────────────────────
for _, kb := range kbMap {
if !s.checkKBTeamPermission(kb, userID) {
return ErrLinkNoAuthorization
}
}
// ── 6. Run conversion in background (fire-and-forget) ────────────────────
kbIDs := req.KbIDs
go func() {
if err := s.convertFiles(allFileIDs, kbIDs, userID); err != nil {
common.Warn("file2document.convertFiles failed",
zap.Strings("file_ids", allFileIDs),
zap.Strings("kb_ids", kbIDs),
zap.Error(err))
}
}()
return nil
}
// convertFiles mirrors Python _convert_files: for each file, remove existing
// documents (routing through DocumentService so KB counters are updated), drop
// the file2document mappings, then create a new document in each target KB and
// a fresh mapping.
func (s *File2DocumentService) convertFiles(fileIDs, kbIDs []string, userID string) error {
for _, fileID := range fileIDs {
// Remove existing documents linked to this file. Routing through
// DocumentService.RemoveDocumentKeepFile ensures KB doc_num/chunk_num/
// token_num counters are decremented (mirrors Python remove_document)
// while preserving the file record itself for re-linking.
mappings, err := s.file2DocumentDAO.GetByFileID(fileID)
if err != nil {
common.Warn("convertFiles: GetByFileID failed", zap.String("fileID", fileID), zap.Error(err))
}
for _, m := range mappings {
if m.DocumentID == nil {
continue
}
if err := s.documentSvc.RemoveDocumentKeepFile(*m.DocumentID); err != nil {
common.Warn("convertFiles: RemoveDocumentKeepFile failed",
zap.String("docID", *m.DocumentID), zap.Error(err))
}
}
// Drop the file2document mappings for this file (mirrors Python
// File2DocumentService.delete_by_file_id, done once per file).
if err := s.file2DocumentDAO.DeleteByFileID(fileID); err != nil {
common.Warn("convertFiles: DeleteByFileID failed", zap.String("fileID", fileID), zap.Error(err))
}
// Reload the source file.
file, err := s.fileDAO.GetByID(fileID)
if err != nil || file == nil {
continue
}
// Create a document + mapping in each target KB.
for _, kbID := range kbIDs {
kb, err := s.kbDAO.GetByID(kbID)
if err != nil || kb == nil {
continue
}
parserID := getParser(file.Type, file.Name, kb.ParserID)
suffix := strings.TrimPrefix(filepath.Ext(file.Name), ".")
doc := &entity.Document{
ID: common.GenerateUUID(),
KbID: kb.ID,
ParserID: parserID,
ParserConfig: kb.ParserConfig,
CreatedBy: userID,
Type: file.Type,
Name: &file.Name,
Suffix: suffix,
Size: file.Size,
}
if file.Location != nil {
doc.Location = file.Location
}
if kb.PipelineID != nil {
doc.PipelineID = kb.PipelineID
}
// InsertDocument creates the row and increments KB doc_num in one
// transaction, so a failed insert never leaves a stale counter.
if err := s.documentSvc.InsertDocument(doc); err != nil {
common.Warn("convertFiles: InsertDocument failed",
zap.String("kbID", kbID), zap.String("fileID", fileID), zap.Error(err))
continue
}
mapping := &entity.File2Document{
ID: common.GenerateUUID(),
FileID: &fileID,
DocumentID: &doc.ID,
}
if err := s.file2DocumentDAO.Create(mapping); err != nil {
common.Warn("convertFiles: Create file2document mapping failed",
zap.String("fileID", fileID), zap.String("docID", doc.ID), zap.Error(err))
}
}
}
return nil
}
// getAllInnermostFileIDs recursively collects all non-folder file IDs under a folder.
// Mirrors Python FileService.get_all_innermost_file_ids.
func (s *File2DocumentService) getAllInnermostFileIDs(folderID string) ([]string, error) {
children, err := s.fileDAO.ListByParentID(folderID)
if err != nil {
return nil, err
}
var ids []string
for _, child := range children {
if child.Type == FileTypeFolder {
sub, err := s.getAllInnermostFileIDs(child.ID)
if err != nil {
return nil, err
}
ids = append(ids, sub...)
} else {
ids = append(ids, child.ID)
}
}
return ids, nil
}
// checkFileTeamPermission mirrors Python check_file_team_permission:
// true when file.TenantID == userID or user is in the file tenant's team.
func (s *File2DocumentService) checkFileTeamPermission(file *entity.File, userID string) bool {
if file.TenantID == userID {
return true
}
tenants, err := s.userTenantDAO.GetByUserID(userID)
if err != nil {
return false
}
for _, t := range tenants {
if t.TenantID == file.TenantID {
return true
}
}
return false
}
// checkKBTeamPermission mirrors Python check_kb_team_permission:
// true when kb.TenantID == userID or user is in the KB tenant's team.
func (s *File2DocumentService) checkKBTeamPermission(kb *entity.Knowledgebase, userID string) bool {
if kb.TenantID == userID {
return true
}
tenants, err := s.userTenantDAO.GetByUserID(userID)
if err != nil {
return false
}
for _, t := range tenants {
if t.TenantID == kb.TenantID {
return true
}
}
return false
}
// getParser maps (fileType, fileName, kbParserID) → a parser ID.
// Mirrors Python FileService.get_parser — falls back to the KB's parser.
func getParser(fileType, fileName, kbParserID string) string {
ext := strings.ToLower(strings.TrimPrefix(filepath.Ext(fileName), "."))
switch ext {
case "pdf":
return "pdf"
case "doc", "docx":
return "naive"
case "ppt", "pptx":
return "presentation"
case "xls", "xlsx":
return "table"
case "txt", "md":
return "naive"
case "png", "jpg", "jpeg", "gif", "bmp", "webp":
return "picture"
}
if kbParserID != "" {
return kbParserID
}
return "naive"
}
// dedupeStrings returns the input slice with duplicates removed, preserving the
// first-seen order.
func dedupeStrings(in []string) []string {
seen := make(map[string]struct{}, len(in))
out := make([]string, 0, len(in))
for _, v := range in {
if _, ok := seen[v]; ok {
continue
}
seen[v] = struct{}{}
out = append(out, v)
}
return out
}