feat: migrate POST /api/v1/datasets/<dataset_id>/documents/stop to Go (#15597)

## Summary

Migrate the stop parse documents endpoint from Python to Go.

### Python endpoint
`POST /api/v1/datasets/<dataset_id>/documents/stop` —
`api/apps/restful_apis/document_api.py:1542-1641`

### Changes
| File | Change |
|------|--------|
| `internal/dao/task.go` | Add `GetByDocID` method |
| `internal/dao/task_test.go` | 3 DAO tests (new file) |
| `internal/service/document.go` | Add `StopParseDocuments` + refactor
shared helpers |
| `internal/service/document_test.go` | 8 service tests |
| `internal/handler/document.go` | Add handler + request struct +
interface |
| `internal/handler/document_test.go` | 5 handler tests |
| `internal/router/router.go` | Add `POST /:dataset_id/documents/stop`
route |

### How it works
1. Validates all document IDs belong to the dataset
2. For each document in RUNNING/CANCEL state (or with unfinished tasks):
- Sets Redis cancel signal `{task_id}-cancel` for each associated task
   - Updates `document.run` to CANCEL ("2")
3. Returns `{"success_count": N, "errors": [...]}`

### Test strategy
- **DAO/Service**: SQLite in-memory DB, zero mocks. Redis is nil-safe by
design.
- **Handler**: `fakeDocumentService` implementing `documentServiceIface`
interface.

🤖 Generated with [Claude Code](https://claude.com/claude-code)
This commit is contained in:
Jack
2026-06-04 14:16:13 +08:00
committed by GitHub
parent 5db1b296fb
commit c6eee09ed3
10 changed files with 1027 additions and 80 deletions

View File

@@ -32,11 +32,14 @@ check_cpp_deps() {
command -v cmake >/dev/null 2>&1 || { echo -e "${RED}Error: cmake is required but not installed.${NC}"; exit 1; }
command -v g++ >/dev/null 2>&1 || { echo -e "${RED}Error: g++ is required but not installed.${NC}"; exit 1; }
# Check for pcre2 library
if [ -f "/usr/lib/x86_64-linux-gnu/libpcre2-8.a" ] || [ -f "/usr/local/lib/libpcre2-8.a" ]; then
# Check for pcre2 library (static .a or shared .so; -lpcre2-8 finds either)
if [ -f "/usr/lib/x86_64-linux-gnu/libpcre2-8.a" ] \
|| [ -f "/usr/lib/x86_64-linux-gnu/libpcre2-8.so" ] \
|| [ -f "/usr/local/lib/libpcre2-8.a" ] \
|| [ -f "/usr/local/lib/libpcre2-8.so" ]; then
echo "✓ pcre2 library found"
else
echo -e "${YELLOW}Warning: libpcre2-8.a not found. You may need to install libpcre2-dev:${NC}"
echo -e "${YELLOW}Warning: libpcre2-8 not found. You may need to install libpcre2-dev:${NC}"
echo " sudo apt-get install libpcre2-dev"
fi
@@ -85,17 +88,19 @@ build_go() {
fi
# Check for pcre2 library — known Linux paths + macOS Homebrew (Apple Silicon
# at /opt/homebrew, Intel Macs at /usr/local).
# at /opt/homebrew, Intel Macs at /usr/local). Checks both .a and .so.
if [ -f "/usr/lib/x86_64-linux-gnu/libpcre2-8.a" ] \
|| [ -f "/usr/lib/x86_64-linux-gnu/libpcre2-8.so" ] \
|| [ -f "/usr/local/lib/libpcre2-8.a" ] \
|| [ -f "/usr/local/lib/libpcre2-8.so" ] \
|| [ -f "/opt/homebrew/lib/libpcre2-8.a" ]; then
echo "✓ pcre2 library found"
else
if [ "$(uname)" = "Darwin" ]; then
echo -e "${RED}Error: libpcre2-8.a not found. Install with: brew install pcre2${NC}"
echo -e "${RED}Error: libpcre2-8 not found. Install with: brew install pcre2${NC}"
exit 1
fi
echo -e "${YELLOW}Warning: libpcre2-8.a not found. You may need to install libpcre2-dev:${NC}"
echo -e "${YELLOW}Warning: libpcre2-8 not found. You may need to install libpcre2-dev:${NC}"
sudo apt -y install libpcre2-dev
fi

View File

@@ -67,9 +67,10 @@ func (dao *DocumentDAO) UpdateByID(id string, updates map[string]interface{}) er
return DB.Model(&entity.Document{}).Where("id = ?", id).Updates(updates).Error
}
// Delete delete document
func (dao *DocumentDAO) Delete(id string) error {
return DB.Delete(&entity.Document{}, "id = ?", id).Error
// Delete hard-deletes document by ID. Returns rows affected.
func (dao *DocumentDAO) Delete(id string) (int64, error) {
result := DB.Where("id = ?", id).Delete(&entity.Document{})
return result.RowsAffected, result.Error
}
// List list documents

View File

@@ -58,6 +58,13 @@ func (dao *TaskDAO) DeleteByTenantID(tenantID string) (int64, error) {
return result.RowsAffected, result.Error
}
// GetByDocID gets all tasks by document ID
func (dao *TaskDAO) GetByDocID(docID string) ([]*entity.Task, error) {
var tasks []*entity.Task
err := DB.Where("doc_id = ?", docID).Find(&tasks).Error
return tasks, err
}
func (dao *TaskDAO) GetAllTasks() ([]*entity.Task, error) {
var tasks []*entity.Task
err := DB.Find(&tasks).Error

125
internal/dao/task_test.go Normal file
View File

@@ -0,0 +1,125 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package dao
import (
"testing"
"github.com/glebarez/sqlite"
"gorm.io/gorm"
"ragflow/internal/entity"
)
// setupTaskTestDB initializes an in-memory SQLite database for Task DAO tests.
func setupTaskTestDB(t *testing.T) *gorm.DB {
t.Helper()
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{
TranslateError: true,
})
if err != nil {
t.Fatalf("failed to open sqlite: %v", err)
}
// Migrate task table (Task depends on Document for the doc_id FK,
// but SQLite doesn't enforce FKs by default)
if err := db.AutoMigrate(&entity.Task{}); err != nil {
t.Fatalf("failed to migrate: %v", err)
}
return db
}
func TestGetByDocID_FindsTasks(t *testing.T) {
db := setupTaskTestDB(t)
orig := DB
DB = db
t.Cleanup(func() { DB = orig })
dao := NewTaskDAO()
// Insert tasks for two different documents
task1 := &entity.Task{ID: "task-1", DocID: "doc-1"}
task2 := &entity.Task{ID: "task-2", DocID: "doc-1"}
task3 := &entity.Task{ID: "task-3", DocID: "doc-2"}
for _, tk := range []*entity.Task{task1, task2, task3} {
if err := dao.Create(tk); err != nil {
t.Fatalf("failed to create task: %v", err)
}
}
tasks, err := dao.GetByDocID("doc-1")
if err != nil {
t.Fatalf("GetByDocID failed: %v", err)
}
if len(tasks) != 2 {
t.Fatalf("expected 2 tasks for doc-1, got %d", len(tasks))
}
// Verify task IDs match
ids := make(map[string]bool)
for _, tk := range tasks {
ids[tk.ID] = true
}
if !ids["task-1"] || !ids["task-2"] {
t.Fatalf("expected task-1 and task-2, got %v", ids)
}
}
func TestGetByDocID_NoTasks(t *testing.T) {
db := setupTaskTestDB(t)
orig := DB
DB = db
t.Cleanup(func() { DB = orig })
dao := NewTaskDAO()
tasks, err := dao.GetByDocID("nonexistent")
if err != nil {
t.Fatalf("GetByDocID failed: %v", err)
}
if len(tasks) != 0 {
t.Fatalf("expected 0 tasks, got %d", len(tasks))
}
}
func TestGetByDocID_EmptyDocID(t *testing.T) {
db := setupTaskTestDB(t)
orig := DB
DB = db
t.Cleanup(func() { DB = orig })
dao := NewTaskDAO()
// Insert a task with empty doc_id to verify edge case
task := &entity.Task{ID: "task-1", DocID: ""}
if err := dao.Create(task); err != nil {
t.Fatalf("failed to create task: %v", err)
}
tasks, err := dao.GetByDocID("")
if err != nil {
t.Fatalf("GetByDocID failed: %v", err)
}
if len(tasks) != 1 {
t.Fatalf("expected 1 task for empty doc_id, got %d", len(tasks))
}
if tasks[0].ID != "task-1" {
t.Fatalf("expected task-1, got %s", tasks[0].ID)
}
}

View File

@@ -44,6 +44,7 @@ type documentServiceIface interface {
DeleteDocument(id string) error
DeleteDocuments(ids []string, deleteAll bool, datasetID, userID string) (int, error)
ParseDocuments(datasetID, userID string, docIDs []string) ([]*service.ParseDocumentResponse, error)
StopParseDocuments(datasetID string, docIDs []string) (map[string]interface{}, error)
ListDocuments(page, pageSize int) ([]*service.DocumentResponse, int64, error)
ListDocumentsByDatasetID(kbID string, page, pageSize int) ([]*entity.DocumentListItem, int64, error)
GetDocumentsByAuthorID(authorID, page, pageSize int) ([]*service.DocumentResponse, int64, error)
@@ -842,3 +843,46 @@ func (h *DocumentHandler) ParseDocuments(c *gin.Context) {
"data": parseResult,
})
}
type StopParseDocumentRequest struct {
DocumentIDs []string `json:"document_ids" binding:"required"`
}
func (h *DocumentHandler) StopParseDocuments(c *gin.Context) {
datasetID := c.Param("dataset_id")
var req StopParseDocumentRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusOK, gin.H{
"code": common.CodeBadRequest,
"message": err.Error(),
})
return
}
if len(req.DocumentIDs) == 0 {
c.JSON(http.StatusOK, gin.H{
"code": common.CodeBadRequest,
"message": "`document_ids` is required",
})
return
}
userID := c.GetString("user_id")
if !h.datasetService.Accessible(datasetID, userID) {
jsonError(c, common.CodeAuthenticationError, "You don't own the dataset.")
return
}
result, err := h.documentService.StopParseDocuments(datasetID, req.DocumentIDs)
if err != nil {
jsonError(c, common.CodeExceptionError, err.Error())
return
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"message": "success",
"data": result,
})
}

View File

@@ -24,17 +24,22 @@ import (
"strings"
"testing"
"github.com/glebarez/sqlite"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
"ragflow/internal/common"
"ragflow/internal/dao"
"ragflow/internal/entity"
"ragflow/internal/service"
)
// fakeDocumentService implements documentServiceIface for handler tests.
type fakeDocumentService struct {
deleted int
err error
deleted int
err error
stopResult map[string]interface{}
stopErr error
}
func (f *fakeDocumentService) CreateDocument(req *service.CreateDocumentRequest) (*entity.Document, error) {
@@ -55,6 +60,9 @@ func (f *fakeDocumentService) DeleteDocuments(ids []string, deleteAll bool, data
func (f *fakeDocumentService) ParseDocuments(datasetID, userID string, docIDs []string) ([]*service.ParseDocumentResponse, error) {
return nil, nil
}
func (f *fakeDocumentService) StopParseDocuments(datasetID string, docIDs []string) (map[string]interface{}, error) {
return f.stopResult, f.stopErr
}
func (f *fakeDocumentService) ListDocuments(page, pageSize int) ([]*service.DocumentResponse, int64, error) {
return nil, 0, nil
}
@@ -244,3 +252,193 @@ func TestDeleteDocumentsHandler_MissingDatasetID(t *testing.T) {
t.Fatal("expected error for missing dataset_id")
}
}
func TestStopParseDocumentsHandler_EmptyDocIDs(t *testing.T) {
gin.SetMode(gin.TestMode)
fake := &fakeDocumentService{}
h := &DocumentHandler{
documentService: fake,
datasetService: service.NewDatasetService(),
}
c, w := setupGinContextWithUser("POST", "/api/v1/datasets/ds-1/documents/stop", `{"document_ids": []}`)
c.Params = gin.Params{{Key: "dataset_id", Value: "ds-1"}}
h.StopParseDocuments(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
code, _ := resp["code"].(float64)
if code == float64(common.CodeSuccess) {
t.Fatal("expected error for empty document_ids")
}
}
func TestStopParseDocumentsHandler_BadJSON(t *testing.T) {
gin.SetMode(gin.TestMode)
fake := &fakeDocumentService{}
h := &DocumentHandler{
documentService: fake,
datasetService: service.NewDatasetService(),
}
c, w := setupGinContextWithUser("POST", "/api/v1/datasets/ds-1/documents/stop", `not json`)
c.Params = gin.Params{{Key: "dataset_id", Value: "ds-1"}}
h.StopParseDocuments(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
code, _ := resp["code"].(float64)
if code == float64(common.CodeSuccess) {
t.Fatal("expected error for bad JSON body")
}
}
// setupHandlerAccessDB sets up SQLite in-memory DB for handler tests that need
// datasetService.Accessible to work.
func setupHandlerAccessDB(t *testing.T) *gorm.DB {
t.Helper()
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{
TranslateError: true,
})
if err != nil {
t.Fatalf("failed to open sqlite: %v", err)
}
if err := db.AutoMigrate(
&entity.User{},
&entity.Tenant{},
&entity.UserTenant{},
&entity.Knowledgebase{},
); err != nil {
t.Fatalf("failed to migrate: %v", err)
}
// Insert user
db.Create(&entity.User{ID: "user-1", Nickname: "test", Email: "test@test.com", Password: sptr("x")})
// Insert tenant
db.Create(&entity.Tenant{ID: "tenant-1", LLMID: "llm-1", EmbdID: "embd-1", ASRID: "asr-1"})
// Insert user_tenant mapping
db.Create(&entity.UserTenant{ID: "ut-1", UserID: "user-1", TenantID: "tenant-1", Role: "admin"})
// Insert knowledgebase
db.Create(&entity.Knowledgebase{
ID: "ds-1", TenantID: "tenant-1", Name: "test-kb", EmbdID: "embd-1",
CreatedBy: "user-1", Permission: string(entity.TenantPermissionMe),
Status: sptr(string(entity.StatusValid)),
})
return db
}
// sptr returns a pointer to the given string (copy of service test helper).
func sptr(s string) *string { return &s }
func TestStopParseDocumentsHandler_Success(t *testing.T) {
db := setupHandlerAccessDB(t)
orig := dao.DB
dao.DB = db
t.Cleanup(func() { dao.DB = orig })
gin.SetMode(gin.TestMode)
fake := &fakeDocumentService{
stopResult: map[string]interface{}{"success_count": 1},
}
h := &DocumentHandler{
documentService: fake,
datasetService: service.NewDatasetService(),
}
c, w := setupGinContextWithUser("POST", "/api/v1/datasets/ds-1/documents/stop", `{"document_ids": ["doc-1"]}`)
c.Params = gin.Params{{Key: "dataset_id", Value: "ds-1"}}
h.StopParseDocuments(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["code"] != float64(common.CodeSuccess) {
t.Fatalf("expected code 0, got %v: %v", resp["code"], resp)
}
data := resp["data"].(map[string]interface{})
if data["success_count"] != float64(1) {
t.Fatalf("expected success_count=1, got %v", data["success_count"])
}
}
func TestStopParseDocumentsHandler_ServiceError(t *testing.T) {
db := setupHandlerAccessDB(t)
orig := dao.DB
dao.DB = db
t.Cleanup(func() { dao.DB = orig })
gin.SetMode(gin.TestMode)
fake := &fakeDocumentService{
stopErr: fmt.Errorf("internal failure"),
}
h := &DocumentHandler{
documentService: fake,
datasetService: service.NewDatasetService(),
}
c, w := setupGinContextWithUser("POST", "/api/v1/datasets/ds-1/documents/stop", `{"document_ids": ["doc-1"]}`)
c.Params = gin.Params{{Key: "dataset_id", Value: "ds-1"}}
h.StopParseDocuments(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
code, _ := resp["code"].(float64)
if code == float64(common.CodeSuccess) {
t.Fatal("expected error code for service error")
}
}
func TestStopParseDocumentsHandler_NotAccessible(t *testing.T) {
db := setupHandlerAccessDB(t)
orig := dao.DB
dao.DB = db
t.Cleanup(func() { dao.DB = orig })
gin.SetMode(gin.TestMode)
fake := &fakeDocumentService{}
h := &DocumentHandler{
documentService: fake,
datasetService: service.NewDatasetService(),
}
c, w := setupGinContextWithUser("POST", "/api/v1/datasets/ds-1/documents/stop", `{"document_ids": ["doc-1"]}`)
// Use a user that doesn't have access to ds-1
c.Set("user_id", "other-user")
c.Params = gin.Params{{Key: "dataset_id", Value: "ds-1"}}
h.StopParseDocuments(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
code, _ := resp["code"].(float64)
if code == float64(common.CodeSuccess) {
t.Fatal("expected error for no authorization")
}
}

View File

@@ -234,6 +234,7 @@ func (r *Router) Setup(engine *gin.Engine) {
// Dataset document chunk
datasets.GET("/:dataset_id/documents/:document_id/chunks/:chunk_id", r.chunkHandler.Get)
datasets.POST("/:dataset_id/documents/parse", r.documentHandler.ParseDocuments)
datasets.POST("/:dataset_id/documents/stop", r.documentHandler.StopParseDocuments)
datasets.DELETE("/:dataset_id/documents/:document_id/chunks", r.chunkHandler.RemoveChunks)
}

View File

@@ -28,12 +28,14 @@ import (
"strings"
"time"
"ragflow/internal/cache"
"ragflow/internal/dao"
"ragflow/internal/engine"
"ragflow/internal/server"
"github.com/google/uuid"
"gorm.io/gorm"
)
// DocumentService document service
@@ -221,21 +223,8 @@ func (s *DocumentService) DeleteDocuments(ids []string, deleteAll bool, datasetI
// 4. Validate IDs belong to this dataset (only for explicit ids; deleteAll is already scoped)
if !deleteAll {
docs, err := s.documentDAO.GetByIDs(ids)
if err != nil {
return 0, fmt.Errorf("failed to fetch documents: %w", err)
}
if len(docs) != len(ids) {
return 0, fmt.Errorf("some document IDs not found in dataset %s", datasetID)
}
var invalid []string
for _, d := range docs {
if d.KbID != datasetID {
invalid = append(invalid, d.ID)
}
}
if len(invalid) > 0 {
return 0, fmt.Errorf("These documents do not belong to dataset %s: %v", datasetID, invalid)
if _, err := s.validateDocsInDataset(ids, datasetID); err != nil {
return 0, err
}
}
@@ -252,94 +241,144 @@ func (s *DocumentService) DeleteDocuments(ids []string, deleteAll bool, datasetI
return deleted, nil
}
// deleteDocumentFull performs full document cleanup:
// 1. Delete tasks from DB
// 2. Delete chunks from document engine
// 3. Delete metadata from document engine
// 4. Hard-delete document row + decrement KB counters
// 5. Delete file2document mapping + file record + storage blob
//
// Non-critical failures are tolerated (logged and continue).
// deleteDocumentFull performs full document cleanup. Non-critical failures
// are tolerated (logged and continue). Critical failures (e.g. document or
// KB not found) return an error immediately.
func (s *DocumentService) deleteDocumentFull(docID string) error {
doc, kb, err := s.resolveDocAndKB(docID)
if err != nil {
return err
}
// Delete tasks from DB
if _, delErr := s.taskDAO.DeleteByDocIDs([]string{docID}); delErr != nil {
common.Logger.Warn(fmt.Sprintf("failed to delete tasks for %s: %v", docID, delErr))
}
s.deleteDocEngineData(docID, kb.TenantID, doc.KbID)
if err := s.deleteDocRecordWithCounters(doc, kb.ID); err != nil {
return err
}
s.cleanupFileReferences(docID)
return nil
}
// resolveDocAndKB loads the document and its knowledgebase, returning both or
// an error.
func (s *DocumentService) resolveDocAndKB(docID string) (*entity.Document, *entity.Knowledgebase, error) {
doc, err := s.documentDAO.GetByID(docID)
if err != nil {
return fmt.Errorf("document not found: %w", err)
return nil, nil, fmt.Errorf("document not found: %w", err)
}
kbID := doc.KbID
tokenNum := doc.TokenNum
chunkIDNum := doc.ChunkNum
// Resolve tenant ID for engine index name
kb, err := s.kbDAO.GetByID(kbID)
kb, err := s.kbDAO.GetByID(doc.KbID)
if err != nil {
return fmt.Errorf("knowledgebase not found: %w", err)
return nil, nil, fmt.Errorf("knowledgebase not found: %w", err)
}
tenantID := kb.TenantID
return doc, kb, nil
}
// 1. Delete tasks from DB
if _, delErr := s.taskDAO.DeleteByDocIDs([]string{docID}); delErr != nil {
common.Logger.Warn(fmt.Sprintf("deleteDocumentFull: failed to delete tasks for %s: %v", docID, delErr))
// deleteDocEngineData removes chunks and metadata from the document engine.
// No-op when the engine is nil.
func (s *DocumentService) deleteDocEngineData(docID, tenantID, kbID string) {
if s.docEngine == nil {
return
}
// 2. Delete chunks from document engine
if s.docEngine != nil {
ctx := context.Background()
indexName := fmt.Sprintf("ragflow_%s", tenantID)
if _, delErr := s.docEngine.DeleteChunks(ctx, map[string]interface{}{"doc_id": docID}, indexName, kbID); delErr != nil {
common.Logger.Warn(fmt.Sprintf("deleteDocumentFull: failed to delete chunks for %s: %v", docID, delErr))
}
ctx := context.Background()
indexName := fmt.Sprintf("ragflow_%s", tenantID)
if _, delErr := s.docEngine.DeleteChunks(ctx, map[string]interface{}{"doc_id": docID}, indexName, kbID); delErr != nil {
common.Logger.Warn(fmt.Sprintf("deleteDocEngineData: failed to delete chunks for %s: %v", docID, delErr))
}
// 3. Delete metadata from document engine (skip if engine not available)
if s.docEngine != nil && s.metadataSvc != nil {
if s.metadataSvc != nil {
_ = s.DeleteDocumentAllMetadata(docID) // logs internally
}
}
// 4. Hard-delete document + decrement KB counters
if delErr := s.documentDAO.Delete(docID); delErr != nil {
return fmt.Errorf("failed to delete document %s: %w", docID, delErr)
}
if decErr := s.kbDAO.DecreaseDocumentNum(kbID, 1, chunkIDNum, tokenNum); decErr != nil {
common.Logger.Warn(fmt.Sprintf("deleteDocumentFull: failed to decrement KB counters for %s: %v", kbID, decErr))
}
// deleteDocRecordWithCounters hard-deletes the document row and decrements the
// KB counters in a single transaction. Counters are only decremented when a
// document row was actually removed (RowsAffected > 0), guarding against
// double-decrement on retries or concurrent deletes.
func (s *DocumentService) deleteDocRecordWithCounters(doc *entity.Document, kbID string) error {
return dao.DB.Transaction(func(tx *gorm.DB) error {
result := tx.Where("id = ?", doc.ID).Delete(&entity.Document{})
if result.Error != nil {
return fmt.Errorf("failed to delete document %s: %w", doc.ID, result.Error)
}
if result.RowsAffected == 0 {
return nil // already deleted by a concurrent request — skip counters
}
// 5. Clean up file2document mapping, file record, and storage blob
decErr := tx.Model(&entity.Knowledgebase{}).
Where("id = ?", kbID).
Updates(map[string]interface{}{
"doc_num": gorm.Expr("doc_num - 1"),
"chunk_num": gorm.Expr("chunk_num - ?", doc.ChunkNum),
"token_num": gorm.Expr("token_num - ?", doc.TokenNum),
}).Error
if decErr != nil {
common.Logger.Warn(fmt.Sprintf("deleteDocRecordWithCounters: failed to decrement KB %s: %v", kbID, decErr))
}
return nil
})
}
// cleanupFileReferences deletes file2document mappings for docID, and for each
// referenced file, only hard-deletes the file record and its storage blob when
// no other document still references the same file_id.
func (s *DocumentService) cleanupFileReferences(docID string) {
mappings, mapErr := s.file2DocumentDAO.GetByDocumentID(docID)
if mapErr != nil {
common.Logger.Warn(fmt.Sprintf("deleteDocumentFull: failed to get file2document mappings for %s: %v", docID, mapErr))
common.Logger.Warn(fmt.Sprintf("cleanupFileReferences: failed to get f2d mappings for %s: %v", docID, mapErr))
}
if len(mappings) == 0 {
return
}
// Collect unique file_ids
seen := make(map[string]bool)
var fileIDs []string
for _, m := range mappings {
if m.FileID == nil {
if m.FileID == nil || seen[*m.FileID] {
continue
}
fileID := *m.FileID
// Delete the mapping
if delErr := s.file2DocumentDAO.DeleteByDocumentID(docID); delErr != nil {
common.Logger.Warn(fmt.Sprintf("deleteDocumentFull: failed to delete f2d mapping for %s: %v", docID, delErr))
seen[*m.FileID] = true
fileIDs = append(fileIDs, *m.FileID)
}
// Delete all file2document rows for this document
if delErr := s.file2DocumentDAO.DeleteByDocumentID(docID); delErr != nil {
common.Logger.Warn(fmt.Sprintf("cleanupFileReferences: failed to delete f2d for %s: %v", docID, delErr))
}
// For each file, only delete the record and blob when no other doc references it
for _, fileID := range fileIDs {
remaining, remErr := s.file2DocumentDAO.GetByFileID(fileID)
if remErr != nil {
common.Logger.Warn(fmt.Sprintf("cleanupFileReferences: failed to check remaining f2d for %s: %v", fileID, remErr))
continue
}
// Get file to remove storage blob
if len(remaining) > 0 {
continue
}
fileDAO := dao.NewFileDAO()
file, fErr := fileDAO.GetByID(fileID)
if fErr != nil || file == nil {
common.Logger.Warn(fmt.Sprintf("deleteDocumentFull: file not found %s: %v", fileID, fErr))
common.Logger.Warn(fmt.Sprintf("cleanupFileReferences: file not found %s: %v", fileID, fErr))
continue
}
// Delete file record
if _, delErr := fileDAO.DeleteByIDs([]string{fileID}); delErr != nil {
common.Logger.Warn(fmt.Sprintf("deleteDocumentFull: failed to delete file %s: %v", fileID, delErr))
common.Logger.Warn(fmt.Sprintf("cleanupFileReferences: failed to delete file %s: %v", fileID, delErr))
continue // keep the blob so the live file row still has its object
}
// Delete storage blob
if file.Location != nil && *file.Location != "" {
storageImpl := storage.GetStorageFactory().GetStorage()
if storageImpl != nil {
if rmErr := storageImpl.Remove(file.ParentID, *file.Location); rmErr != nil {
common.Logger.Warn(fmt.Sprintf("deleteDocumentFull: failed to remove blob %s/%s: %v", file.ParentID, *file.Location, rmErr))
common.Logger.Warn(fmt.Sprintf("cleanupFileReferences: failed to remove blob %s/%s: %v", file.ParentID, *file.Location, rmErr))
}
}
}
}
return nil
}
// ListDocuments list documents
@@ -478,6 +517,104 @@ func (s *DocumentService) ParseDocuments(datasetID, userID string, docIDs []stri
return responses, nil
}
// StopParseDocuments stops parsing for the given documents in a dataset.
// It sets Redis cancel signals for associated tasks and updates doc.run to CANCEL.
// Returns a map with success_count and optionally errors.
func (s *DocumentService) StopParseDocuments(datasetID string, docIDs []string) (map[string]interface{}, error) {
deduped := common.Deduplicate(docIDs)
if len(deduped) == 0 {
return nil, fmt.Errorf("no document IDs provided")
}
docs, err := s.validateDocsInDataset(deduped, datasetID)
if err != nil {
return nil, err
}
var errors []string
successCount := 0
for _, doc := range docs {
if cancelErr := s.cancelDocParse(doc); cancelErr != nil {
errors = append(errors, cancelErr.Error())
continue
}
successCount++
}
result := map[string]interface{}{"success_count": successCount}
if len(errors) > 0 {
result["errors"] = errors
}
return result, nil
}
// validateDocsInDataset deduplicates IDs, fetches the documents, and ensures
// every document exists and belongs to the given dataset. Returns the resolved
// documents.
func (s *DocumentService) validateDocsInDataset(docIDs []string, datasetID string) ([]*entity.Document, error) {
docs, err := s.documentDAO.GetByIDs(docIDs)
if err != nil {
return nil, fmt.Errorf("failed to fetch documents: %w", err)
}
if len(docs) != len(docIDs) {
return nil, fmt.Errorf("some document IDs not found in dataset %s", datasetID)
}
var invalid []string
for _, d := range docs {
if d.KbID != datasetID {
invalid = append(invalid, d.ID)
}
}
if len(invalid) > 0 {
return nil, fmt.Errorf("these documents do not belong to dataset %s: %v", datasetID, invalid)
}
return docs, nil
}
// cancelDocParse sets Redis cancel signals for the document's active tasks and
// marks the document run status as CANCEL. Returns an error if the document is
// not in a cancellable state or the status update fails.
func (s *DocumentService) cancelDocParse(doc *entity.Document) error {
tasks, taskErr := s.taskDAO.GetByDocID(doc.ID)
if taskErr != nil {
return fmt.Errorf("failed to get tasks for %s: %v", doc.ID, taskErr)
}
hasUnfinishedTask := false
for _, t := range tasks {
if t.Progress < 1 {
hasUnfinishedTask = true
break
}
}
canCancel := false
if doc.Run != nil {
if *doc.Run == string(entity.TaskStatusRunning) || *doc.Run == string(entity.TaskStatusCancel) {
canCancel = true
}
}
if hasUnfinishedTask {
canCancel = true
}
if !canCancel {
return fmt.Errorf("can't stop parsing document that has not started or already completed")
}
// Set Redis cancel signal for each task (best-effort)
redisClient := cache.Get()
for _, t := range tasks {
if redisClient != nil {
redisClient.Set(fmt.Sprintf("%s-cancel", t.ID), "x", 0)
}
}
if upErr := s.documentDAO.UpdateByID(doc.ID, map[string]interface{}{"run": string(entity.TaskStatusCancel)}); upErr != nil {
return fmt.Errorf("failed to update document %s: %v", doc.ID, upErr)
}
return nil
}
// toResponse convert model.Document to DocumentResponse
func (s *DocumentService) toResponse(doc *entity.Document) *DocumentResponse {
createdAt := ""

View File

@@ -243,6 +243,48 @@ func TestDeleteDocumentFull_CleansUpFile2Document(t *testing.T) {
}
}
func TestDeleteDocumentFull_SharedFilePreserved(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertTestKB(t, "kb-1", "tenant-1", 2, 20, 10)
insertTestDoc(t, "doc-1", "kb-1", 10, 5)
insertTestDoc(t, "doc-2", "kb-1", 10, 5)
loc := "shared/blob"
insertTestFile(t, "file-shared", "kb-1", "shared.pdf", &loc)
// Same file linked to TWO documents
insertTestFile2Document(t, "f2d-1", "file-shared", "doc-1")
insertTestFile2Document(t, "f2d-2", "file-shared", "doc-2")
svc := testDocumentService(t)
// Delete doc-1; file-shared should survive because doc-2 still references it
err := svc.deleteDocumentFull("doc-1")
if err != nil {
t.Fatalf("deleteDocumentFull failed: %v", err)
}
// f2d mapping for doc-1 should be gone
f2dDAO := dao.NewFile2DocumentDAO()
mappings, _ := f2dDAO.GetByDocumentID("doc-1")
if len(mappings) != 0 {
t.Fatalf("expected 0 f2d mappings for doc-1, got %d", len(mappings))
}
// file record should still exist (doc-2 still references it)
files, _ := dao.NewFileDAO().GetByIDs([]string{"file-shared"})
if len(files) != 1 {
t.Fatalf("expected 1 file record to survive, got %d", len(files))
}
// f2d mapping for doc-2 should still exist
mappings, _ = f2dDAO.GetByDocumentID("doc-2")
if len(mappings) != 1 {
t.Fatalf("expected 1 f2d mapping for doc-2, got %d", len(mappings))
}
}
func insertUserTenantForAccessCheck(t *testing.T, userID, tenantID string) {
t.Helper()
// Insert user if not exists (email is NOT NULL, password is nullable pointer)
@@ -406,6 +448,206 @@ func TestDeleteDocuments_Deduplicate(t *testing.T) {
}
}
// insertTestDocWithRun inserts a document with the given Run status for StopParseDocuments tests.
func insertTestDocWithRun(t *testing.T, id, kbID, run string, tokenNum, chunkNum int64) {
t.Helper()
doc := &entity.Document{
ID: id,
KbID: kbID,
ParserID: "naive",
ParserConfig: entity.JSONMap{},
TokenNum: tokenNum,
ChunkNum: chunkNum,
Suffix: ".txt",
Status: sptr("1"),
Run: &run,
}
if err := dao.DB.Create(doc).Error; err != nil {
t.Fatalf("insert test doc: %v", err)
}
}
// insertTestTaskWithProgress inserts a task with the given progress value.
func insertTestTaskWithProgress(t *testing.T, id, docID string, progress float64) {
t.Helper()
task := &entity.Task{
ID: id,
DocID: docID,
Progress: progress,
}
if err := dao.DB.Create(task).Error; err != nil {
t.Fatalf("insert test task: %v", err)
}
}
func TestStopParseDocuments_Success(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertTestKB(t, "kb-1", "tenant-1", 1, 10, 5)
insertTestDocWithRun(t, "doc-1", "kb-1", string(entity.TaskStatusRunning), 10, 5)
insertTestTask(t, "task-1", "doc-1")
svc := testDocumentService(t)
result, err := svc.StopParseDocuments("kb-1", []string{"doc-1"})
if err != nil {
t.Fatalf("StopParseDocuments failed: %v", err)
}
sc, ok := result["success_count"].(int)
if !ok {
t.Fatalf("success_count not found or wrong type: %v", result)
}
if sc != 1 {
t.Fatalf("expected success_count=1, got %d", sc)
}
// Verify document run status updated to CANCEL
doc, _ := dao.NewDocumentDAO().GetByID("doc-1")
if doc == nil || doc.Run == nil {
t.Fatal("doc not found or run is nil")
}
if *doc.Run != string(entity.TaskStatusCancel) {
t.Fatalf("expected run=%q, got %q", string(entity.TaskStatusCancel), *doc.Run)
}
}
func TestStopParseDocuments_CancelStatus(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertTestKB(t, "kb-1", "tenant-1", 1, 10, 5)
// Doc is already in CANCEL state — should still be accepted
insertTestDocWithRun(t, "doc-1", "kb-1", string(entity.TaskStatusCancel), 10, 5)
insertTestTask(t, "task-1", "doc-1")
svc := testDocumentService(t)
result, err := svc.StopParseDocuments("kb-1", []string{"doc-1"})
if err != nil {
t.Fatalf("StopParseDocuments failed: %v", err)
}
sc := result["success_count"].(int)
if sc != 1 {
t.Fatalf("expected success_count=1, got %d", sc)
}
}
func TestStopParseDocuments_NotRunningOrCancel(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertTestKB(t, "kb-1", "tenant-1", 1, 10, 5)
// Doc with Run="0" (UNSTART) and no unfinished tasks → cannot cancel
insertTestDocWithRun(t, "doc-1", "kb-1", string(entity.TaskStatusUnstart), 10, 5)
svc := testDocumentService(t)
result, err := svc.StopParseDocuments("kb-1", []string{"doc-1"})
if err != nil {
t.Fatalf("StopParseDocuments failed: %v", err)
}
sc := result["success_count"].(int)
if sc != 0 {
t.Fatalf("expected success_count=0, got %d", sc)
}
errors, ok := result["errors"].([]string)
if !ok || len(errors) == 0 {
t.Fatal("expected errors in result")
}
}
func TestStopParseDocuments_UnfinishedTask(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertTestKB(t, "kb-1", "tenant-1", 1, 10, 5)
// Doc with Run="0" but has an unfinished task (progress < 1) → can cancel
insertTestDocWithRun(t, "doc-1", "kb-1", string(entity.TaskStatusUnstart), 10, 5)
insertTestTaskWithProgress(t, "task-1", "doc-1", 0.0)
svc := testDocumentService(t)
result, err := svc.StopParseDocuments("kb-1", []string{"doc-1"})
if err != nil {
t.Fatalf("StopParseDocuments failed: %v", err)
}
sc := result["success_count"].(int)
if sc != 1 {
t.Fatalf("expected success_count=1 (has unfinished task), got %d", sc)
}
}
func TestStopParseDocuments_WrongDataset(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertTestKB(t, "kb-1", "tenant-1", 1, 10, 5)
insertTestKB(t, "kb-2", "tenant-1", 1, 10, 5)
insertTestDocWithRun(t, "doc-1", "kb-2", string(entity.TaskStatusRunning), 10, 5)
svc := testDocumentService(t)
_, err := svc.StopParseDocuments("kb-1", []string{"doc-1"})
if err == nil {
t.Fatal("expected error for doc not belonging to dataset")
}
}
func TestStopParseDocuments_NotFound(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertTestKB(t, "kb-1", "tenant-1", 0, 0, 0)
svc := testDocumentService(t)
_, err := svc.StopParseDocuments("kb-1", []string{"nonexistent"})
if err == nil {
t.Fatal("expected error for nonexistent document IDs")
}
}
func TestStopParseDocuments_EmptyIDs(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertTestKB(t, "kb-1", "tenant-1", 0, 0, 0)
svc := testDocumentService(t)
_, err := svc.StopParseDocuments("kb-1", []string{})
if err == nil {
t.Fatal("expected error for empty doc IDs")
}
}
func TestStopParseDocuments_Deduplicate(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertTestKB(t, "kb-1", "tenant-1", 1, 10, 5)
insertTestDocWithRun(t, "doc-1", "kb-1", string(entity.TaskStatusRunning), 10, 5)
insertTestTask(t, "task-1", "doc-1")
svc := testDocumentService(t)
result, err := svc.StopParseDocuments("kb-1", []string{"doc-1", "doc-1", "doc-1"})
if err != nil {
t.Fatalf("StopParseDocuments failed: %v", err)
}
// Dedup should result in only 1 success
sc := result["success_count"].(int)
if sc != 1 {
t.Fatalf("expected success_count=1 after dedup, got %d", sc)
}
}
func TestDeleteDocument_DeligatesToFullCleanup(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
@@ -426,3 +668,190 @@ func TestDeleteDocument_DeligatesToFullCleanup(t *testing.T) {
t.Fatal("document should be deleted")
}
}
// --- Sub-method tests ---
func TestResolveDocAndKB_Success(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertTestKB(t, "kb-1", "tenant-1", 1, 10, 5)
insertTestDoc(t, "doc-1", "kb-1", 10, 5)
svc := testDocumentService(t)
doc, kb, err := svc.resolveDocAndKB("doc-1")
if err != nil {
t.Fatalf("resolveDocAndKB: %v", err)
}
if doc.ID != "doc-1" {
t.Fatalf("doc ID mismatch: %s", doc.ID)
}
if kb.ID != "kb-1" {
t.Fatalf("kb ID mismatch: %s", kb.ID)
}
if kb.TenantID != "tenant-1" {
t.Fatalf("tenant ID mismatch: %s", kb.TenantID)
}
}
func TestResolveDocAndKB_DocNotFound(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
svc := testDocumentService(t)
_, _, err := svc.resolveDocAndKB("nonexistent")
if err == nil {
t.Fatal("expected error for nonexistent doc")
}
}
func TestResolveDocAndKB_KBNotFound(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
// Insert a doc with kb_id that has no KB row
d := &entity.Document{
ID: "orphan-doc", KbID: "no-such-kb", ParserID: "naive",
ParserConfig: entity.JSONMap{}, Suffix: ".txt", Status: sptr("1"),
}
if err := dao.DB.Create(d).Error; err != nil {
t.Fatalf("insert doc: %v", err)
}
svc := testDocumentService(t)
_, _, err := svc.resolveDocAndKB("orphan-doc")
if err == nil {
t.Fatal("expected error for nonexistent KB")
}
}
func TestDeleteDocRecordWithCounters_Success(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertTestKB(t, "kb-1", "tenant-1", 3, 100, 50)
insertTestDoc(t, "doc-1", "kb-1", 30, 10)
doc, _ := dao.NewDocumentDAO().GetByID("doc-1")
svc := testDocumentService(t)
err := svc.deleteDocRecordWithCounters(doc, "kb-1")
if err != nil {
t.Fatalf("deleteDocRecordWithCounters: %v", err)
}
// Doc gone
_, err = dao.NewDocumentDAO().GetByID("doc-1")
if err == nil {
t.Fatal("document should be deleted")
}
// Counters decremented
kb, _ := dao.NewKnowledgebaseDAO().GetByID("kb-1")
if kb.DocNum != 2 {
t.Fatalf("doc_num: expected 2, got %d", kb.DocNum)
}
if kb.TokenNum != 70 {
t.Fatalf("token_num: expected 70, got %d", kb.TokenNum)
}
if kb.ChunkNum != 40 {
t.Fatalf("chunk_num: expected 40, got %d", kb.ChunkNum)
}
}
func TestDeleteDocRecordWithCounters_DocAlreadyDeleted(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertTestKB(t, "kb-1", "tenant-1", 1, 10, 5)
insertTestDoc(t, "doc-1", "kb-1", 10, 5)
doc, _ := dao.NewDocumentDAO().GetByID("doc-1")
svc := testDocumentService(t)
// First delete: row removed, counters decremented
if err := svc.deleteDocRecordWithCounters(doc, "kb-1"); err != nil {
t.Fatalf("first delete: %v", err)
}
// Second delete: RowsAffected==0 → counters NOT decremented again
if err := svc.deleteDocRecordWithCounters(doc, "kb-1"); err != nil {
t.Fatalf("second delete should not error: %v", err)
}
// KB counters should be decremented exactly once: 1→0 for doc_num
kb, _ := dao.NewKnowledgebaseDAO().GetByID("kb-1")
if kb.DocNum != 0 {
t.Fatalf("doc_num: expected 0 (decremented once), got %d", kb.DocNum)
}
if kb.TokenNum != 0 {
t.Fatalf("token_num: expected 0, got %d", kb.TokenNum)
}
if kb.ChunkNum != 0 {
t.Fatalf("chunk_num: expected 0, got %d", kb.ChunkNum)
}
}
func TestCleanupFileReferences_NoMappings(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
svc := testDocumentService(t)
// Should not panic with no f2d mappings
svc.cleanupFileReferences("no-mappings")
}
func TestCleanupFileReferences_SingleFileDeleted(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
loc := "blob/path"
insertTestFile(t, "file-1", "kb-1", "test.pdf", &loc)
insertTestFile2Document(t, "f2d-1", "file-1", "doc-1")
svc := testDocumentService(t)
svc.cleanupFileReferences("doc-1")
// f2d gone
mappings, _ := dao.NewFile2DocumentDAO().GetByDocumentID("doc-1")
if len(mappings) != 0 {
t.Fatalf("expected 0 f2d after cleanup, got %d", len(mappings))
}
// file record gone
files, _ := dao.NewFileDAO().GetByIDs([]string{"file-1"})
if len(files) != 0 {
t.Fatalf("expected 0 files after cleanup, got %d", len(files))
}
}
func TestCleanupFileReferences_SharedFileSurvives(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
loc := "shared/blob"
insertTestFile(t, "file-shared", "kb-1", "shared.pdf", &loc)
insertTestFile2Document(t, "f2d-1", "file-shared", "doc-1")
insertTestFile2Document(t, "f2d-2", "file-shared", "doc-2")
svc := testDocumentService(t)
svc.cleanupFileReferences("doc-1")
// f2d for doc-1 gone
mappings, _ := dao.NewFile2DocumentDAO().GetByDocumentID("doc-1")
if len(mappings) != 0 {
t.Fatalf("expected 0 f2d for doc-1, got %d", len(mappings))
}
// file record survives
files, _ := dao.NewFileDAO().GetByIDs([]string{"file-shared"})
if len(files) != 1 {
t.Fatalf("expected 1 file record, got %d", len(files))
}
// f2d for doc-2 survives
mappings, _ = dao.NewFile2DocumentDAO().GetByDocumentID("doc-2")
if len(mappings) != 1 {
t.Fatalf("expected 1 f2d for doc-2, got %d", len(mappings))
}
}

View File

@@ -626,7 +626,7 @@ func (s *FileService) deleteSingleFile(ctx context.Context, file *entity.File) e
}
// Delete document record
if err := documentDAO.Delete(docID); err != nil {
if _, err := documentDAO.Delete(docID); err != nil {
common.Logger.Error(fmt.Sprintf("Fail to delete document: %s, error: %v", docID, err))
}
}