feat[Go] implement datasets/<dataset_id>/<index_type> DELETE (#16257)

### What problem does this PR solve?

As title

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Haruko386
2026-06-24 14:47:55 +08:00
committed by GitHub
parent 368db6fa58
commit 97718ec779
4 changed files with 428 additions and 2 deletions

View File

@@ -513,7 +513,8 @@ func (h *DatasetsHandler) DeleteKnowledgeGraph(c *gin.Context) {
indexName := fmt.Sprintf("ragflow_%s", tenantID)
if _, err := docEngine.DeleteChunks(c.Request.Context(), map[string]interface{}{
"knowledge_graph_kwd": []string{"graph", "subgraph", "entity", "relation", "community_report"},
"knowledge_graph_kwd": []interface{}{"graph", "subgraph", "entity", "relation", "community_report"},
"kb_id": datasetID,
}, indexName, datasetID); err != nil {
jsonInternalError(c, err)
return
@@ -698,6 +699,47 @@ func (h *DatasetsHandler) TraceIndex(c *gin.Context) {
})
}
// DeleteIndex Delete an indexing task (graph/raptor/mindmap) for a dataset.
func (h *DatasetsHandler) DeleteIndex(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
datasetID := strings.TrimSpace(c.Param("dataset_id"))
if datasetID == "" {
jsonError(c, common.CodeDataError, "dataset_id is required")
return
}
userID := strings.TrimSpace(user.ID)
if userID == "" {
jsonError(c, common.CodeDataError, "user_id is required")
return
}
indexType := strings.ToLower(strings.TrimSpace(c.Param("index_type")))
if indexType == "" {
indexType = strings.ToLower(strings.TrimSpace(c.Query("type")))
}
wipeArg := strings.ToLower(strings.TrimSpace(c.DefaultQuery("wipe", "true")))
wipe := true
switch wipeArg {
case "false", "0", "no", "off":
wipe = false
}
code, err := h.datasetsService.DeleteIndex(userID, datasetID, indexType, wipe)
if err != nil {
jsonError(c, code, err.Error())
return
}
jsonResponse(c, common.CodeSuccess, map[string]interface{}{}, "success")
}
// ListMetadataFlattened handles GET /api/v1/datasets/metadata/flattened.
// @Summary List flattened metadata for datasets
// @Description Get flattened metadata for multiple datasets

View File

@@ -277,7 +277,9 @@ func (r *Router) Setup(engine *gin.Engine) {
datasets.POST("/:dataset_id/documents/batch-update-status", r.documentHandler.BatchUpdateDocumentStatus)
datasets.GET("/:dataset_id/index", r.datasetsHandler.TraceIndex)
datasets.POST("/:dataset_id/index", r.datasetsHandler.RunIndex)
datasets.DELETE("/:dataset_id/graph", r.datasetsHandler.DeleteKnowledgeGraph)
datasets.DELETE("/:dataset_id/index", r.datasetsHandler.DeleteIndex)
datasets.DELETE("/:dataset_id/:index_type", r.datasetsHandler.DeleteIndex)
//datasets.DELETE("/:dataset_id/graph", r.datasetsHandler.DeleteKnowledgeGraph)
datasets.POST("", r.datasetsHandler.CreateDataset)
datasets.DELETE("", r.datasetsHandler.DeleteDatasets)
datasets.POST("/search", r.datasetsHandler.SearchDatasets)

View File

@@ -83,6 +83,9 @@ const (
graphRaptorQueueDocID = "graph_raptor_x"
maximumTaskPageNumber = int64(100000000)
serverQueueNamePrefix = "te"
graphPhaseResolutionDone = "resolution_done"
graphPhaseCommunityDone = "community_done"
)
// DatasetService implements the RESTful dataset APIs from dataset_api.py.
@@ -166,6 +169,7 @@ func (s *DatasetService) UpdateDocumentMetadataConfig(userID, datasetID, documen
return updatedDoc, common.CodeSuccess, nil
}
// checkType reports whether indexType is supported by dataset index APIs.
func checkType(indexType string) bool {
haveType := false
for _, t := range validIndexTypes {
@@ -319,6 +323,19 @@ func datasetIndexTaskIDColumn(indexType string) string {
}
}
func datasetIndexTaskFinishAtColumn(indexType string) string {
switch indexType {
case "graph":
return "graphrag_task_finish_at"
case "raptor":
return "raptor_task_finish_at"
case "mindmap":
return "mindmap_task_finish_at"
default:
return ""
}
}
func checkIndexTaskType(taskType string) bool {
switch taskType {
case "graphrag", "raptor", "mindmap":
@@ -380,6 +397,25 @@ func datasetIndexQueueName(priority int) string {
return fmt.Sprintf("%s.%d.common", serverQueueNamePrefix, priority)
}
func interfaceSlice(items ...string) []interface{} {
result := make([]interface{}, len(items))
for i, item := range items {
result[i] = item
}
return result
}
func clearGraphPhaseMarkers(redisClient *redisengine.RedisClient, datasetID string) {
if redisClient == nil || datasetID == "" {
return
}
for _, phase := range []string{graphPhaseResolutionDone, graphPhaseCommunityDone} {
if !redisClient.Delete(fmt.Sprintf("graphrag:phase:%s:%s", datasetID, phase)) {
common.Warn("Failed to clear GraphRAG phase marker", zap.String("dataset_id", datasetID), zap.String("phase", phase))
}
}
}
// RunIndex Run an indexing task (graph/raptor/mindmap) for a dataset.
func (s *DatasetService) RunIndex(userID, datasetID, indexType string) (map[string]interface{}, common.ErrorCode, error) {
if !checkType(indexType) {
@@ -529,6 +565,85 @@ func (s *DatasetService) TraceIndex(datasetID, userID, indexType string) (*entit
return task, common.CodeSuccess, nil
}
func (s *DatasetService) DeleteIndex(userID, datasetID, indexType string, wipe bool) (common.ErrorCode, error) {
if !checkType(indexType) {
return common.CodeArgumentError, fmt.Errorf("Invalid index type '%s'", indexType)
}
if datasetID == "" {
return common.CodeDataError, errors.New(`Lack of "Dataset ID"`)
}
if !s.kbDAO.Accessible(datasetID, userID) {
return common.CodeDataError, errors.New("No authorization.")
}
kb, err := s.kbDAO.GetByID(datasetID)
if err != nil {
if dao.IsNotFoundErr(err) {
return common.CodeDataError, errors.New("Invalid Dataset ID")
}
return common.CodeDataError, errors.New("Internal server error")
}
taskIDField := datasetIndexTaskIDColumn(indexType)
taskFinishAtField := datasetIndexTaskFinishAtColumn(indexType)
taskID := datasetIndexTaskID(kb, indexType)
common.Info("delete_index", zap.String("dataset_id", datasetID), zap.String("index_type", indexType), zap.Bool("wipe", wipe))
if taskID != "" {
redisClient := redisengine.Get()
if redisClient == nil || !redisClient.Set(fmt.Sprintf("%s-cancel", taskID), "x", 0) {
common.Warn("Failed to set dataset index cancellation marker", zap.String("dataset_id", datasetID), zap.String("task_id", taskID))
}
if err := dao.DB.Unscoped().Where("id = ?", taskID).Delete(&entity.Task{}).Error; err != nil {
common.Warn("Failed to delete dataset index task", zap.String("dataset_id", datasetID), zap.String("task_id", taskID), zap.Error(err))
return common.CodeDataError, errors.New("Internal server error")
}
}
if wipe && indexType == "graph" {
if s.docEngine == nil {
return common.CodeServerError, errors.New("Document engine is not initialized")
}
indexName := fmt.Sprintf("ragflow_%s", kb.TenantID)
_, err = s.docEngine.DeleteChunks(context.Background(), map[string]interface{}{
"knowledge_graph_kwd": interfaceSlice("graph", "subgraph", "entity", "relation", "community_report"),
"kb_id": datasetID,
}, indexName, datasetID)
if err != nil {
common.Warn("Failed to delete GraphRAG artefacts", zap.String("dataset_id", datasetID), zap.Error(err))
return common.CodeDataError, errors.New("Internal server error")
}
clearGraphPhaseMarkers(redisengine.Get(), datasetID)
common.Info("delete_index: cleared GraphRAG artefacts and phase markers", zap.String("dataset_id", datasetID))
} else if wipe && indexType == "raptor" {
if s.docEngine == nil {
return common.CodeServerError, errors.New("Document engine is not initialized")
}
indexName := fmt.Sprintf("ragflow_%s", kb.TenantID)
_, err = s.docEngine.DeleteChunks(context.Background(), map[string]interface{}{
"raptor_kwd": interfaceSlice("raptor"),
"kb_id": datasetID,
}, indexName, datasetID)
if err != nil {
common.Warn("Failed to delete RAPTOR artefacts", zap.String("dataset_id", datasetID), zap.Error(err))
return common.CodeDataError, errors.New("Internal server error")
}
}
if err := dao.DB.Model(&entity.Knowledgebase{}).Where("id = ?", kb.ID).Updates(map[string]interface{}{
taskIDField: "",
taskFinishAtField: nil,
}).Error; err != nil {
common.Warn("Failed to clear dataset index task fields", zap.String("dataset_id", datasetID), zap.String("index_type", indexType), zap.Error(err))
return common.CodeDataError, errors.New("Internal server error")
}
return common.CodeSuccess, nil
}
// SearchDatasetsRequest is the request structure for searching chunks across datasets.
type SearchDatasetsRequest struct {
DatasetIDs []string `json:"dataset_ids" binding:"required"`

View File

@@ -0,0 +1,267 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package service
import (
"context"
"errors"
"testing"
"time"
"gorm.io/gorm"
"ragflow/internal/common"
"ragflow/internal/dao"
"ragflow/internal/entity"
)
type deleteIndexDocEngine struct {
fakeChatDocEngine
deleteCalls []deleteIndexDocEngineCall
}
type deleteIndexDocEngineCall struct {
condition map[string]interface{}
indexName string
datasetID string
}
func (e *deleteIndexDocEngine) DeleteChunks(_ context.Context, condition map[string]interface{}, indexName string, datasetID string) (int64, error) {
e.deleteCalls = append(e.deleteCalls, deleteIndexDocEngineCall{
condition: condition,
indexName: indexName,
datasetID: datasetID,
})
return 1, nil
}
func testDatasetServiceForDeleteIndex(docEngine *deleteIndexDocEngine) *DatasetService {
return &DatasetService{
kbDAO: dao.NewKnowledgebaseDAO(),
taskDAO: dao.NewTaskDAO(),
docEngine: docEngine,
}
}
func insertDeleteIndexKB(t *testing.T, indexType string, taskID string) {
t.Helper()
finishAt := time.Date(2026, 6, 23, 10, 0, 0, 0, time.UTC)
kb := &entity.Knowledgebase{
ID: "kb-1",
TenantID: "user-1",
Name: "test-kb",
EmbdID: "embedding@OpenAI",
CreatedBy: "user-1",
Permission: string(entity.TenantPermissionMe),
ParserID: "naive",
ParserConfig: entity.JSONMap{},
Status: sptr("1"),
}
switch indexType {
case "graph":
kb.GraphragTaskID = &taskID
kb.GraphragTaskFinishAt = &finishAt
case "raptor":
kb.RaptorTaskID = &taskID
kb.RaptorTaskFinishAt = &finishAt
case "mindmap":
kb.MindmapTaskID = &taskID
kb.MindmapTaskFinishAt = &finishAt
}
if err := dao.DB.Create(kb).Error; err != nil {
t.Fatalf("insert kb: %v", err)
}
if taskID != "" {
if err := dao.DB.Create(&entity.Task{ID: taskID, DocID: "doc-1", TaskType: indexTypeToTaskType[indexType]}).Error; err != nil {
t.Fatalf("insert task: %v", err)
}
}
}
func TestDatasetServiceDeleteIndexGraphWipeFalseOnlyCancelsTask(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertDeleteIndexKB(t, "graph", "graph-task")
docEngine := &deleteIndexDocEngine{}
code, err := testDatasetServiceForDeleteIndex(docEngine).DeleteIndex("user-1", "kb-1", "graph", false)
if err != nil {
t.Fatalf("DeleteIndex failed: %v", err)
}
if code != common.CodeSuccess {
t.Fatalf("expected success code, got %d", code)
}
if len(docEngine.deleteCalls) != 0 {
t.Fatalf("wipe=false should not delete doc-store artefacts, got %#v", docEngine.deleteCalls)
}
assertDeleteIndexTaskDeleted(t, "graph-task")
kb := getDeleteIndexKB(t)
if kb.GraphragTaskID == nil || *kb.GraphragTaskID != "" {
t.Fatalf("expected graphrag_task_id to be cleared to empty string, got %#v", kb.GraphragTaskID)
}
if kb.GraphragTaskFinishAt != nil {
t.Fatalf("expected graphrag_task_finish_at to be cleared, got %#v", kb.GraphragTaskFinishAt)
}
}
func TestDatasetServiceDeleteIndexGraphWipeTrueDeletesArtefacts(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertDeleteIndexKB(t, "graph", "graph-task")
docEngine := &deleteIndexDocEngine{}
code, err := testDatasetServiceForDeleteIndex(docEngine).DeleteIndex("user-1", "kb-1", "graph", true)
if err != nil {
t.Fatalf("DeleteIndex failed: %v", err)
}
if code != common.CodeSuccess {
t.Fatalf("expected success code, got %d", code)
}
if len(docEngine.deleteCalls) != 1 {
t.Fatalf("expected one doc-store delete call, got %#v", docEngine.deleteCalls)
}
call := docEngine.deleteCalls[0]
if call.indexName != "ragflow_user-1" || call.datasetID != "kb-1" {
t.Fatalf("unexpected delete target: %#v", call)
}
if call.condition["kb_id"] != "kb-1" {
t.Fatalf("delete condition must include kb_id, got %#v", call.condition)
}
assertStringSet(t, call.condition["knowledge_graph_kwd"], []string{"graph", "subgraph", "entity", "relation", "community_report"})
assertDeleteIndexTaskDeleted(t, "graph-task")
}
func TestDatasetServiceDeleteIndexRaptorWipeTrueDeletesRaptorArtefacts(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertDeleteIndexKB(t, "raptor", "raptor-task")
docEngine := &deleteIndexDocEngine{}
code, err := testDatasetServiceForDeleteIndex(docEngine).DeleteIndex("user-1", "kb-1", "raptor", true)
if err != nil {
t.Fatalf("DeleteIndex failed: %v", err)
}
if code != common.CodeSuccess {
t.Fatalf("expected success code, got %d", code)
}
if len(docEngine.deleteCalls) != 1 {
t.Fatalf("expected one doc-store delete call, got %#v", docEngine.deleteCalls)
}
call := docEngine.deleteCalls[0]
if call.condition["kb_id"] != "kb-1" {
t.Fatalf("delete condition must include kb_id, got %#v", call.condition)
}
assertStringSet(t, call.condition["raptor_kwd"], []string{"raptor"})
assertDeleteIndexTaskDeleted(t, "raptor-task")
kb := getDeleteIndexKB(t)
if kb.RaptorTaskID == nil || *kb.RaptorTaskID != "" {
t.Fatalf("expected raptor_task_id to be cleared to empty string, got %#v", kb.RaptorTaskID)
}
if kb.RaptorTaskFinishAt != nil {
t.Fatalf("expected raptor_task_finish_at to be cleared, got %#v", kb.RaptorTaskFinishAt)
}
}
func TestDatasetServiceDeleteIndexMindmapDoesNotDeleteDocStore(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
insertDeleteIndexKB(t, "mindmap", "mindmap-task")
docEngine := &deleteIndexDocEngine{}
code, err := testDatasetServiceForDeleteIndex(docEngine).DeleteIndex("user-1", "kb-1", "mindmap", true)
if err != nil {
t.Fatalf("DeleteIndex failed: %v", err)
}
if code != common.CodeSuccess {
t.Fatalf("expected success code, got %d", code)
}
if len(docEngine.deleteCalls) != 0 {
t.Fatalf("mindmap delete should not delete doc-store artefacts, got %#v", docEngine.deleteCalls)
}
assertDeleteIndexTaskDeleted(t, "mindmap-task")
kb := getDeleteIndexKB(t)
if kb.MindmapTaskID == nil || *kb.MindmapTaskID != "" {
t.Fatalf("expected mindmap_task_id to be cleared to empty string, got %#v", kb.MindmapTaskID)
}
if kb.MindmapTaskFinishAt != nil {
t.Fatalf("expected mindmap_task_finish_at to be cleared, got %#v", kb.MindmapTaskFinishAt)
}
}
func TestDatasetServiceDeleteIndexRejectsInvalidType(t *testing.T) {
db := setupServiceTestDB(t)
pushServiceDB(t, db)
code, err := testDatasetServiceForDeleteIndex(&deleteIndexDocEngine{}).DeleteIndex("user-1", "kb-1", "invalid", true)
if err == nil {
t.Fatal("expected invalid index type error")
}
if code != common.CodeArgumentError {
t.Fatalf("expected argument error code, got %d", code)
}
}
func assertDeleteIndexTaskDeleted(t *testing.T, taskID string) {
t.Helper()
var task entity.Task
err := dao.DB.Where("id = ?", taskID).First(&task).Error
if !errors.Is(err, gorm.ErrRecordNotFound) {
t.Fatalf("expected task %s to be deleted, got err=%v task=%#v", taskID, err, task)
}
}
func getDeleteIndexKB(t *testing.T) entity.Knowledgebase {
t.Helper()
var kb entity.Knowledgebase
if err := dao.DB.Where("id = ?", "kb-1").First(&kb).Error; err != nil {
t.Fatalf("fetch kb: %v", err)
}
return kb
}
func assertStringSet(t *testing.T, actual interface{}, expected []string) {
t.Helper()
items, ok := actual.([]interface{})
if !ok {
t.Fatalf("expected []interface{}, got %#v", actual)
}
if len(items) != len(expected) {
t.Fatalf("expected %d items, got %#v", len(expected), items)
}
seen := make(map[string]bool, len(items))
for _, item := range items {
value, ok := item.(string)
if !ok {
t.Fatalf("expected string item, got %#v", item)
}
seen[value] = true
}
for _, item := range expected {
if !seen[item] {
t.Fatalf("missing %q in %#v", item, items)
}
}
}