diff --git a/internal/handler/document.go b/internal/handler/document.go index e637ceac4f..cdd1d348d6 100644 --- a/internal/handler/document.go +++ b/internal/handler/document.go @@ -64,6 +64,7 @@ type documentServiceIface interface { IngestDocuments(datasetID, userID string, docIDs []string) ([]*service.ParseDocumentResponse, error) StopIngestionTasks(tasks []string, userID string) ([]*entity.IngestionTask, error) RemoveIngestionTasks(tasks []string, userID string) ([]map[string]string, error) + BatchUpdateDocumentStatus(userID, datasetID, status string, DocumentIDs []string) (map[string]interface{}, common.ErrorCode, error) } // DocumentHandler document handler @@ -398,6 +399,79 @@ func (h *DocumentHandler) DeleteDocuments(c *gin.Context) { jsonResponse(c, common.CodeSuccess, map[string]interface{}{"deleted": deleted}, "success") } +// BatchUpdateDocumentStatus Batch update status of documents within a dataset. +func (h *DocumentHandler) BatchUpdateDocumentStatus(c *gin.Context) { + user, code, errorMessage := GetUser(c) + if code != common.CodeSuccess { + jsonError(c, code, errorMessage) + return + } + + userID := strings.TrimSpace(user.ID) + if userID == "" { + jsonError(c, common.CodeArgumentError, "invalid user id") + return + } + + datasetID := strings.TrimSpace(c.Param("dataset_id")) + if datasetID == "" { + jsonError(c, common.CodeArgumentError, "dataset_id is required") + return + } + + var req struct { + DocumentIDs []interface{} `json:"doc_ids"` + Status interface{} `json:"status"` + } + if err := c.ShouldBindJSON(&req); err != nil { + jsonError(c, common.CodeDataError, err.Error()) + return + } + + if req.DocumentIDs == nil || len(req.DocumentIDs) == 0 { + jsonError(c, common.CodeArgumentError, `"doc_ids" must be a non-empty list.`) + return + } + documentIDs := make([]string, 0, len(req.DocumentIDs)) + for _, rawDocID := range req.DocumentIDs { + docID, ok := rawDocID.(string) + if !ok || strings.TrimSpace(docID) == "" { + jsonError(c, common.CodeArgumentError, `"doc_ids" must contain non-empty document IDs.`) + return + } + documentIDs = append(documentIDs, docID) + } + + status := "-1" + if req.Status != nil { + status = fmt.Sprint(req.Status) + } + if status != "0" && status != "1" { + jsonError(c, common.CodeArgumentError, fmt.Sprintf(`"Status" must be either 0 or 1:%s!`, status)) + return + } + + result, code, err := h.documentService.BatchUpdateDocumentStatus(userID, datasetID, status, documentIDs) + if err != nil { + message := err.Error() + if code == common.CodeServerError { + message = "Partial failure" + } + c.JSON(http.StatusOK, gin.H{ + "code": code, + "data": result, + "message": message, + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": code, + "data": result, + "message": "success", + }) +} + // ListDocuments document list func (h *DocumentHandler) ListDocuments(c *gin.Context) { diff --git a/internal/handler/document_test.go b/internal/handler/document_test.go index 472ccb7ac4..93d807539d 100644 --- a/internal/handler/document_test.go +++ b/internal/handler/document_test.go @@ -111,6 +111,9 @@ func (f *fakeDocumentService) ListDocuments(page, pageSize int) ([]*service.Docu func (f *fakeDocumentService) ListDocumentsByDatasetID(kbID string, page, pageSize int) ([]*entity.DocumentListItem, int64, error) { return nil, 0, nil } +func (f *fakeDocumentService) BatchUpdateDocumentStatus(userID, datasetID, status string, documentIDs []string) (map[string]interface{}, common.ErrorCode, error) { + return map[string]interface{}{}, common.CodeSuccess, nil +} func (f *fakeDocumentService) GetThumbnail(docID string) (*service.ThumbnailResponse, error) { return nil, nil } diff --git a/internal/router/router.go b/internal/router/router.go index f0d5f5d103..26bf3670ae 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -273,6 +273,7 @@ func (r *Router) Setup(engine *gin.Engine) { datasets.PUT("/:dataset_id", r.datasetsHandler.UpdateDataset) datasets.GET("/:dataset_id/graph", r.datasetsHandler.GetKnowledgeGraph) datasets.DELETE("/:dataset_id/tags", r.datasetsHandler.RemoveTags) + datasets.POST("/:dataset_id/documents/batch-update-status", r.documentHandler.BatchUpdateDocumentStatus) datasets.GET("/:dataset_id/index", r.datasetsHandler.TraceIndex) datasets.POST("/:dataset_id/index", r.datasetsHandler.RunIndex) datasets.DELETE("/:dataset_id/graph", r.datasetsHandler.DeleteKnowledgeGraph) diff --git a/internal/service/document.go b/internal/service/document.go index 1ba84d09a5..b8f8042802 100644 --- a/internal/service/document.go +++ b/internal/service/document.go @@ -731,6 +731,95 @@ func (s *DocumentService) GetThumbnail(docID string) (*ThumbnailResponse, error) return &result, nil } +func (s *DocumentService) BatchUpdateDocumentStatus(userID, datasetID, status string, documentIDs []string) (map[string]interface{}, common.ErrorCode, error) { + kb, err := s.kbDAO.GetByIDAndTenantID(datasetID, userID) + if err != nil { + return nil, common.CodeDataError, fmt.Errorf("You don't own the dataset.") + } + statusInt, convErr := strconv.Atoi(status) + if convErr != nil { + return nil, common.CodeArgumentError, fmt.Errorf("invalid status: %s", status) + } + + result := make(map[string]interface{}, len(documentIDs)) + hasError := false + + documents, err := s.documentDAO.GetByIDs(documentIDs) + if err != nil { + return nil, common.CodeServerError, fmt.Errorf("failed to fetch documents: %w", err) + } + documentByID := make(map[string]*entity.Document, len(documents)) + for _, doc := range documents { + documentByID[doc.ID] = doc + } + + for _, docID := range documentIDs { + doc, ok := documentByID[docID] + if !ok { + result[docID] = map[string]string{"error": "Document not found"} + hasError = true + continue + } + + if doc.KbID != datasetID { + result[docID] = map[string]string{"error": "Document not found in this dataset."} + hasError = true + continue + } + + currentStatus := "" + if doc.Status != nil { + currentStatus = *doc.Status + } + if currentStatus == status { + result[docID] = map[string]string{"status": status} + continue + } + previousStatus := interface{}(nil) + if doc.Status != nil { + previousStatus = *doc.Status + } + if err := s.documentDAO.UpdateByID(docID, map[string]interface{}{"status": status}); err != nil { + result[docID] = map[string]string{"error": "Database error (Document update)!"} + hasError = true + continue + } + + if doc.ChunkNum > 0 { + if s.docEngine == nil { + _ = s.documentDAO.UpdateByID(docID, map[string]interface{}{"status": previousStatus}) + result[docID] = map[string]string{"error": "Document store update failed: document engine not initialized"} + hasError = true + continue + } + err := s.docEngine.UpdateChunks( + context.Background(), + map[string]interface{}{"doc_id": docID}, + map[string]interface{}{"available_int": statusInt}, + fmt.Sprintf("ragflow_%s", kb.TenantID), + doc.KbID, + ) + if err != nil { + _ = s.documentDAO.UpdateByID(docID, map[string]interface{}{"status": previousStatus}) + msg := err.Error() + if strings.Contains(msg, "3022") { + result[docID] = map[string]string{"error": "Document store table missing."} + } else { + result[docID] = map[string]string{"error": "Document store update failed: " + msg} + } + hasError = true + continue + } + } + result[docID] = map[string]string{"status": status} + } + + if hasError { + return result, common.CodeServerError, fmt.Errorf("Partial failure") + } + return result, common.CodeSuccess, nil +} + // ListDocumentsByDatasetID list documents by knowledge base ID func (s *DocumentService) ListDocumentsByDatasetID(kbID string, page, pageSize int) ([]*entity.DocumentListItem, int64, error) { offset := (page - 1) * pageSize