diff --git a/internal/handler/datasets.go b/internal/handler/datasets.go index 59b2355e9a..a0636e4792 100644 --- a/internal/handler/datasets.go +++ b/internal/handler/datasets.go @@ -513,7 +513,8 @@ func (h *DatasetsHandler) DeleteKnowledgeGraph(c *gin.Context) { indexName := fmt.Sprintf("ragflow_%s", tenantID) if _, err := docEngine.DeleteChunks(c.Request.Context(), map[string]interface{}{ - "knowledge_graph_kwd": []string{"graph", "subgraph", "entity", "relation", "community_report"}, + "knowledge_graph_kwd": []interface{}{"graph", "subgraph", "entity", "relation", "community_report"}, + "kb_id": datasetID, }, indexName, datasetID); err != nil { jsonInternalError(c, err) return @@ -698,6 +699,47 @@ func (h *DatasetsHandler) TraceIndex(c *gin.Context) { }) } +// DeleteIndex Delete an indexing task (graph/raptor/mindmap) for a dataset. +func (h *DatasetsHandler) DeleteIndex(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + datasetID := strings.TrimSpace(c.Param("dataset_id")) + if datasetID == "" { + jsonError(c, common.CodeDataError, "dataset_id is required") + return + } + + userID := strings.TrimSpace(user.ID) + if userID == "" { + jsonError(c, common.CodeDataError, "user_id is required") + return + } + + indexType := strings.ToLower(strings.TrimSpace(c.Param("index_type"))) + if indexType == "" { + indexType = strings.ToLower(strings.TrimSpace(c.Query("type"))) + } + + wipeArg := strings.ToLower(strings.TrimSpace(c.DefaultQuery("wipe", "true"))) + wipe := true + switch wipeArg { + case "false", "0", "no", "off": + wipe = false + } + + code, err := h.datasetsService.DeleteIndex(userID, datasetID, indexType, wipe) + if err != nil { + jsonError(c, code, err.Error()) + return + } + + jsonResponse(c, common.CodeSuccess, map[string]interface{}{}, "success") +} + // ListMetadataFlattened handles GET /api/v1/datasets/metadata/flattened. // @Summary List flattened metadata for datasets // @Description Get flattened metadata for multiple datasets diff --git a/internal/router/router.go b/internal/router/router.go index 608228756c..349d0c329d 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -277,7 +277,9 @@ func (r *Router) Setup(engine *gin.Engine) { datasets.POST("/:dataset_id/documents/batch-update-status", r.documentHandler.BatchUpdateDocumentStatus) datasets.GET("/:dataset_id/index", r.datasetsHandler.TraceIndex) datasets.POST("/:dataset_id/index", r.datasetsHandler.RunIndex) - datasets.DELETE("/:dataset_id/graph", r.datasetsHandler.DeleteKnowledgeGraph) + datasets.DELETE("/:dataset_id/index", r.datasetsHandler.DeleteIndex) + datasets.DELETE("/:dataset_id/:index_type", r.datasetsHandler.DeleteIndex) + //datasets.DELETE("/:dataset_id/graph", r.datasetsHandler.DeleteKnowledgeGraph) datasets.POST("", r.datasetsHandler.CreateDataset) datasets.DELETE("", r.datasetsHandler.DeleteDatasets) datasets.POST("/search", r.datasetsHandler.SearchDatasets) diff --git a/internal/service/dataset.go b/internal/service/dataset.go index ad80850835..0579f1e691 100644 --- a/internal/service/dataset.go +++ b/internal/service/dataset.go @@ -83,6 +83,9 @@ const ( graphRaptorQueueDocID = "graph_raptor_x" maximumTaskPageNumber = int64(100000000) serverQueueNamePrefix = "te" + + graphPhaseResolutionDone = "resolution_done" + graphPhaseCommunityDone = "community_done" ) // DatasetService implements the RESTful dataset APIs from dataset_api.py. @@ -166,6 +169,7 @@ func (s *DatasetService) UpdateDocumentMetadataConfig(userID, datasetID, documen return updatedDoc, common.CodeSuccess, nil } +// checkType reports whether indexType is supported by dataset index APIs. func checkType(indexType string) bool { haveType := false for _, t := range validIndexTypes { @@ -319,6 +323,19 @@ func datasetIndexTaskIDColumn(indexType string) string { } } +func datasetIndexTaskFinishAtColumn(indexType string) string { + switch indexType { + case "graph": + return "graphrag_task_finish_at" + case "raptor": + return "raptor_task_finish_at" + case "mindmap": + return "mindmap_task_finish_at" + default: + return "" + } +} + func checkIndexTaskType(taskType string) bool { switch taskType { case "graphrag", "raptor", "mindmap": @@ -380,6 +397,25 @@ func datasetIndexQueueName(priority int) string { return fmt.Sprintf("%s.%d.common", serverQueueNamePrefix, priority) } +func interfaceSlice(items ...string) []interface{} { + result := make([]interface{}, len(items)) + for i, item := range items { + result[i] = item + } + return result +} + +func clearGraphPhaseMarkers(redisClient *redisengine.RedisClient, datasetID string) { + if redisClient == nil || datasetID == "" { + return + } + for _, phase := range []string{graphPhaseResolutionDone, graphPhaseCommunityDone} { + if !redisClient.Delete(fmt.Sprintf("graphrag:phase:%s:%s", datasetID, phase)) { + common.Warn("Failed to clear GraphRAG phase marker", zap.String("dataset_id", datasetID), zap.String("phase", phase)) + } + } +} + // RunIndex Run an indexing task (graph/raptor/mindmap) for a dataset. func (s *DatasetService) RunIndex(userID, datasetID, indexType string) (map[string]interface{}, common.ErrorCode, error) { if !checkType(indexType) { @@ -529,6 +565,85 @@ func (s *DatasetService) TraceIndex(datasetID, userID, indexType string) (*entit return task, common.CodeSuccess, nil } +func (s *DatasetService) DeleteIndex(userID, datasetID, indexType string, wipe bool) (common.ErrorCode, error) { + if !checkType(indexType) { + return common.CodeArgumentError, fmt.Errorf("Invalid index type '%s'", indexType) + } + + if datasetID == "" { + return common.CodeDataError, errors.New(`Lack of "Dataset ID"`) + } + + if !s.kbDAO.Accessible(datasetID, userID) { + return common.CodeDataError, errors.New("No authorization.") + } + + kb, err := s.kbDAO.GetByID(datasetID) + if err != nil { + if dao.IsNotFoundErr(err) { + return common.CodeDataError, errors.New("Invalid Dataset ID") + } + return common.CodeDataError, errors.New("Internal server error") + } + + taskIDField := datasetIndexTaskIDColumn(indexType) + taskFinishAtField := datasetIndexTaskFinishAtColumn(indexType) + taskID := datasetIndexTaskID(kb, indexType) + + common.Info("delete_index", zap.String("dataset_id", datasetID), zap.String("index_type", indexType), zap.Bool("wipe", wipe)) + + if taskID != "" { + redisClient := redisengine.Get() + if redisClient == nil || !redisClient.Set(fmt.Sprintf("%s-cancel", taskID), "x", 0) { + common.Warn("Failed to set dataset index cancellation marker", zap.String("dataset_id", datasetID), zap.String("task_id", taskID)) + } + if err := dao.DB.Unscoped().Where("id = ?", taskID).Delete(&entity.Task{}).Error; err != nil { + common.Warn("Failed to delete dataset index task", zap.String("dataset_id", datasetID), zap.String("task_id", taskID), zap.Error(err)) + return common.CodeDataError, errors.New("Internal server error") + } + } + + if wipe && indexType == "graph" { + if s.docEngine == nil { + return common.CodeServerError, errors.New("Document engine is not initialized") + } + indexName := fmt.Sprintf("ragflow_%s", kb.TenantID) + _, err = s.docEngine.DeleteChunks(context.Background(), map[string]interface{}{ + "knowledge_graph_kwd": interfaceSlice("graph", "subgraph", "entity", "relation", "community_report"), + "kb_id": datasetID, + }, indexName, datasetID) + if err != nil { + common.Warn("Failed to delete GraphRAG artefacts", zap.String("dataset_id", datasetID), zap.Error(err)) + return common.CodeDataError, errors.New("Internal server error") + } + clearGraphPhaseMarkers(redisengine.Get(), datasetID) + common.Info("delete_index: cleared GraphRAG artefacts and phase markers", zap.String("dataset_id", datasetID)) + } else if wipe && indexType == "raptor" { + if s.docEngine == nil { + return common.CodeServerError, errors.New("Document engine is not initialized") + } + indexName := fmt.Sprintf("ragflow_%s", kb.TenantID) + _, err = s.docEngine.DeleteChunks(context.Background(), map[string]interface{}{ + "raptor_kwd": interfaceSlice("raptor"), + "kb_id": datasetID, + }, indexName, datasetID) + if err != nil { + common.Warn("Failed to delete RAPTOR artefacts", zap.String("dataset_id", datasetID), zap.Error(err)) + return common.CodeDataError, errors.New("Internal server error") + } + } + + if err := dao.DB.Model(&entity.Knowledgebase{}).Where("id = ?", kb.ID).Updates(map[string]interface{}{ + taskIDField: "", + taskFinishAtField: nil, + }).Error; err != nil { + common.Warn("Failed to clear dataset index task fields", zap.String("dataset_id", datasetID), zap.String("index_type", indexType), zap.Error(err)) + return common.CodeDataError, errors.New("Internal server error") + } + + return common.CodeSuccess, nil +} + // SearchDatasetsRequest is the request structure for searching chunks across datasets. type SearchDatasetsRequest struct { DatasetIDs []string `json:"dataset_ids" binding:"required"` diff --git a/internal/service/dataset_delete_index_test.go b/internal/service/dataset_delete_index_test.go new file mode 100644 index 0000000000..7ab1325f64 --- /dev/null +++ b/internal/service/dataset_delete_index_test.go @@ -0,0 +1,267 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package service + +import ( + "context" + "errors" + "testing" + "time" + + "gorm.io/gorm" + + "ragflow/internal/common" + "ragflow/internal/dao" + "ragflow/internal/entity" +) + +type deleteIndexDocEngine struct { + fakeChatDocEngine + deleteCalls []deleteIndexDocEngineCall +} + +type deleteIndexDocEngineCall struct { + condition map[string]interface{} + indexName string + datasetID string +} + +func (e *deleteIndexDocEngine) DeleteChunks(_ context.Context, condition map[string]interface{}, indexName string, datasetID string) (int64, error) { + e.deleteCalls = append(e.deleteCalls, deleteIndexDocEngineCall{ + condition: condition, + indexName: indexName, + datasetID: datasetID, + }) + return 1, nil +} + +func testDatasetServiceForDeleteIndex(docEngine *deleteIndexDocEngine) *DatasetService { + return &DatasetService{ + kbDAO: dao.NewKnowledgebaseDAO(), + taskDAO: dao.NewTaskDAO(), + docEngine: docEngine, + } +} + +func insertDeleteIndexKB(t *testing.T, indexType string, taskID string) { + t.Helper() + + finishAt := time.Date(2026, 6, 23, 10, 0, 0, 0, time.UTC) + kb := &entity.Knowledgebase{ + ID: "kb-1", + TenantID: "user-1", + Name: "test-kb", + EmbdID: "embedding@OpenAI", + CreatedBy: "user-1", + Permission: string(entity.TenantPermissionMe), + ParserID: "naive", + ParserConfig: entity.JSONMap{}, + Status: sptr("1"), + } + + switch indexType { + case "graph": + kb.GraphragTaskID = &taskID + kb.GraphragTaskFinishAt = &finishAt + case "raptor": + kb.RaptorTaskID = &taskID + kb.RaptorTaskFinishAt = &finishAt + case "mindmap": + kb.MindmapTaskID = &taskID + kb.MindmapTaskFinishAt = &finishAt + } + + if err := dao.DB.Create(kb).Error; err != nil { + t.Fatalf("insert kb: %v", err) + } + if taskID != "" { + if err := dao.DB.Create(&entity.Task{ID: taskID, DocID: "doc-1", TaskType: indexTypeToTaskType[indexType]}).Error; err != nil { + t.Fatalf("insert task: %v", err) + } + } +} + +func TestDatasetServiceDeleteIndexGraphWipeFalseOnlyCancelsTask(t *testing.T) { + db := setupServiceTestDB(t) + pushServiceDB(t, db) + insertDeleteIndexKB(t, "graph", "graph-task") + + docEngine := &deleteIndexDocEngine{} + code, err := testDatasetServiceForDeleteIndex(docEngine).DeleteIndex("user-1", "kb-1", "graph", false) + if err != nil { + t.Fatalf("DeleteIndex failed: %v", err) + } + if code != common.CodeSuccess { + t.Fatalf("expected success code, got %d", code) + } + if len(docEngine.deleteCalls) != 0 { + t.Fatalf("wipe=false should not delete doc-store artefacts, got %#v", docEngine.deleteCalls) + } + + assertDeleteIndexTaskDeleted(t, "graph-task") + kb := getDeleteIndexKB(t) + if kb.GraphragTaskID == nil || *kb.GraphragTaskID != "" { + t.Fatalf("expected graphrag_task_id to be cleared to empty string, got %#v", kb.GraphragTaskID) + } + if kb.GraphragTaskFinishAt != nil { + t.Fatalf("expected graphrag_task_finish_at to be cleared, got %#v", kb.GraphragTaskFinishAt) + } +} + +func TestDatasetServiceDeleteIndexGraphWipeTrueDeletesArtefacts(t *testing.T) { + db := setupServiceTestDB(t) + pushServiceDB(t, db) + insertDeleteIndexKB(t, "graph", "graph-task") + + docEngine := &deleteIndexDocEngine{} + code, err := testDatasetServiceForDeleteIndex(docEngine).DeleteIndex("user-1", "kb-1", "graph", true) + if err != nil { + t.Fatalf("DeleteIndex failed: %v", err) + } + if code != common.CodeSuccess { + t.Fatalf("expected success code, got %d", code) + } + if len(docEngine.deleteCalls) != 1 { + t.Fatalf("expected one doc-store delete call, got %#v", docEngine.deleteCalls) + } + + call := docEngine.deleteCalls[0] + if call.indexName != "ragflow_user-1" || call.datasetID != "kb-1" { + t.Fatalf("unexpected delete target: %#v", call) + } + if call.condition["kb_id"] != "kb-1" { + t.Fatalf("delete condition must include kb_id, got %#v", call.condition) + } + assertStringSet(t, call.condition["knowledge_graph_kwd"], []string{"graph", "subgraph", "entity", "relation", "community_report"}) + assertDeleteIndexTaskDeleted(t, "graph-task") +} + +func TestDatasetServiceDeleteIndexRaptorWipeTrueDeletesRaptorArtefacts(t *testing.T) { + db := setupServiceTestDB(t) + pushServiceDB(t, db) + insertDeleteIndexKB(t, "raptor", "raptor-task") + + docEngine := &deleteIndexDocEngine{} + code, err := testDatasetServiceForDeleteIndex(docEngine).DeleteIndex("user-1", "kb-1", "raptor", true) + if err != nil { + t.Fatalf("DeleteIndex failed: %v", err) + } + if code != common.CodeSuccess { + t.Fatalf("expected success code, got %d", code) + } + if len(docEngine.deleteCalls) != 1 { + t.Fatalf("expected one doc-store delete call, got %#v", docEngine.deleteCalls) + } + call := docEngine.deleteCalls[0] + if call.condition["kb_id"] != "kb-1" { + t.Fatalf("delete condition must include kb_id, got %#v", call.condition) + } + assertStringSet(t, call.condition["raptor_kwd"], []string{"raptor"}) + assertDeleteIndexTaskDeleted(t, "raptor-task") + + kb := getDeleteIndexKB(t) + if kb.RaptorTaskID == nil || *kb.RaptorTaskID != "" { + t.Fatalf("expected raptor_task_id to be cleared to empty string, got %#v", kb.RaptorTaskID) + } + if kb.RaptorTaskFinishAt != nil { + t.Fatalf("expected raptor_task_finish_at to be cleared, got %#v", kb.RaptorTaskFinishAt) + } +} + +func TestDatasetServiceDeleteIndexMindmapDoesNotDeleteDocStore(t *testing.T) { + db := setupServiceTestDB(t) + pushServiceDB(t, db) + insertDeleteIndexKB(t, "mindmap", "mindmap-task") + + docEngine := &deleteIndexDocEngine{} + code, err := testDatasetServiceForDeleteIndex(docEngine).DeleteIndex("user-1", "kb-1", "mindmap", true) + if err != nil { + t.Fatalf("DeleteIndex failed: %v", err) + } + if code != common.CodeSuccess { + t.Fatalf("expected success code, got %d", code) + } + if len(docEngine.deleteCalls) != 0 { + t.Fatalf("mindmap delete should not delete doc-store artefacts, got %#v", docEngine.deleteCalls) + } + assertDeleteIndexTaskDeleted(t, "mindmap-task") + + kb := getDeleteIndexKB(t) + if kb.MindmapTaskID == nil || *kb.MindmapTaskID != "" { + t.Fatalf("expected mindmap_task_id to be cleared to empty string, got %#v", kb.MindmapTaskID) + } + if kb.MindmapTaskFinishAt != nil { + t.Fatalf("expected mindmap_task_finish_at to be cleared, got %#v", kb.MindmapTaskFinishAt) + } +} + +func TestDatasetServiceDeleteIndexRejectsInvalidType(t *testing.T) { + db := setupServiceTestDB(t) + pushServiceDB(t, db) + + code, err := testDatasetServiceForDeleteIndex(&deleteIndexDocEngine{}).DeleteIndex("user-1", "kb-1", "invalid", true) + if err == nil { + t.Fatal("expected invalid index type error") + } + if code != common.CodeArgumentError { + t.Fatalf("expected argument error code, got %d", code) + } +} + +func assertDeleteIndexTaskDeleted(t *testing.T, taskID string) { + t.Helper() + var task entity.Task + err := dao.DB.Where("id = ?", taskID).First(&task).Error + if !errors.Is(err, gorm.ErrRecordNotFound) { + t.Fatalf("expected task %s to be deleted, got err=%v task=%#v", taskID, err, task) + } +} + +func getDeleteIndexKB(t *testing.T) entity.Knowledgebase { + t.Helper() + var kb entity.Knowledgebase + if err := dao.DB.Where("id = ?", "kb-1").First(&kb).Error; err != nil { + t.Fatalf("fetch kb: %v", err) + } + return kb +} + +func assertStringSet(t *testing.T, actual interface{}, expected []string) { + t.Helper() + + items, ok := actual.([]interface{}) + if !ok { + t.Fatalf("expected []interface{}, got %#v", actual) + } + if len(items) != len(expected) { + t.Fatalf("expected %d items, got %#v", len(expected), items) + } + + seen := make(map[string]bool, len(items)) + for _, item := range items { + value, ok := item.(string) + if !ok { + t.Fatalf("expected string item, got %#v", item) + } + seen[value] = true + } + for _, item := range expected { + if !seen[item] { + t.Fatalf("missing %q in %#v", item, items) + } + } +}