feat[Go]: implement /datasets/<dataset_id>/documents/batch-update-status (#16258)

### What problem does this PR solve?

accident close #16072

As title

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Haruko386
2026-06-23 19:19:08 +08:00
committed by GitHub
parent 6cbd069ea3
commit 5046626c17
4 changed files with 167 additions and 0 deletions

View File

@@ -64,6 +64,7 @@ type documentServiceIface interface {
IngestDocuments(datasetID, userID string, docIDs []string) ([]*service.ParseDocumentResponse, error)
StopIngestionTasks(tasks []string, userID string) ([]*entity.IngestionTask, error)
RemoveIngestionTasks(tasks []string, userID string) ([]map[string]string, error)
BatchUpdateDocumentStatus(userID, datasetID, status string, DocumentIDs []string) (map[string]interface{}, common.ErrorCode, error)
}
// DocumentHandler document handler
@@ -398,6 +399,79 @@ func (h *DocumentHandler) DeleteDocuments(c *gin.Context) {
jsonResponse(c, common.CodeSuccess, map[string]interface{}{"deleted": deleted}, "success")
}
// BatchUpdateDocumentStatus Batch update status of documents within a dataset.
func (h *DocumentHandler) BatchUpdateDocumentStatus(c *gin.Context) {
user, code, errorMessage := GetUser(c)
if code != common.CodeSuccess {
jsonError(c, code, errorMessage)
return
}
userID := strings.TrimSpace(user.ID)
if userID == "" {
jsonError(c, common.CodeArgumentError, "invalid user id")
return
}
datasetID := strings.TrimSpace(c.Param("dataset_id"))
if datasetID == "" {
jsonError(c, common.CodeArgumentError, "dataset_id is required")
return
}
var req struct {
DocumentIDs []interface{} `json:"doc_ids"`
Status interface{} `json:"status"`
}
if err := c.ShouldBindJSON(&req); err != nil {
jsonError(c, common.CodeDataError, err.Error())
return
}
if req.DocumentIDs == nil || len(req.DocumentIDs) == 0 {
jsonError(c, common.CodeArgumentError, `"doc_ids" must be a non-empty list.`)
return
}
documentIDs := make([]string, 0, len(req.DocumentIDs))
for _, rawDocID := range req.DocumentIDs {
docID, ok := rawDocID.(string)
if !ok || strings.TrimSpace(docID) == "" {
jsonError(c, common.CodeArgumentError, `"doc_ids" must contain non-empty document IDs.`)
return
}
documentIDs = append(documentIDs, docID)
}
status := "-1"
if req.Status != nil {
status = fmt.Sprint(req.Status)
}
if status != "0" && status != "1" {
jsonError(c, common.CodeArgumentError, fmt.Sprintf(`"Status" must be either 0 or 1:%s!`, status))
return
}
result, code, err := h.documentService.BatchUpdateDocumentStatus(userID, datasetID, status, documentIDs)
if err != nil {
message := err.Error()
if code == common.CodeServerError {
message = "Partial failure"
}
c.JSON(http.StatusOK, gin.H{
"code": code,
"data": result,
"message": message,
})
return
}
c.JSON(http.StatusOK, gin.H{
"code": code,
"data": result,
"message": "success",
})
}
// ListDocuments document list
func (h *DocumentHandler) ListDocuments(c *gin.Context) {

View File

@@ -111,6 +111,9 @@ func (f *fakeDocumentService) ListDocuments(page, pageSize int) ([]*service.Docu
func (f *fakeDocumentService) ListDocumentsByDatasetID(kbID string, page, pageSize int) ([]*entity.DocumentListItem, int64, error) {
return nil, 0, nil
}
func (f *fakeDocumentService) BatchUpdateDocumentStatus(userID, datasetID, status string, documentIDs []string) (map[string]interface{}, common.ErrorCode, error) {
return map[string]interface{}{}, common.CodeSuccess, nil
}
func (f *fakeDocumentService) GetThumbnail(docID string) (*service.ThumbnailResponse, error) {
return nil, nil
}

View File

@@ -273,6 +273,7 @@ func (r *Router) Setup(engine *gin.Engine) {
datasets.PUT("/:dataset_id", r.datasetsHandler.UpdateDataset)
datasets.GET("/:dataset_id/graph", r.datasetsHandler.GetKnowledgeGraph)
datasets.DELETE("/:dataset_id/tags", r.datasetsHandler.RemoveTags)
datasets.POST("/:dataset_id/documents/batch-update-status", r.documentHandler.BatchUpdateDocumentStatus)
datasets.GET("/:dataset_id/index", r.datasetsHandler.TraceIndex)
datasets.POST("/:dataset_id/index", r.datasetsHandler.RunIndex)
datasets.DELETE("/:dataset_id/graph", r.datasetsHandler.DeleteKnowledgeGraph)

View File

@@ -731,6 +731,95 @@ func (s *DocumentService) GetThumbnail(docID string) (*ThumbnailResponse, error)
return &result, nil
}
func (s *DocumentService) BatchUpdateDocumentStatus(userID, datasetID, status string, documentIDs []string) (map[string]interface{}, common.ErrorCode, error) {
kb, err := s.kbDAO.GetByIDAndTenantID(datasetID, userID)
if err != nil {
return nil, common.CodeDataError, fmt.Errorf("You don't own the dataset.")
}
statusInt, convErr := strconv.Atoi(status)
if convErr != nil {
return nil, common.CodeArgumentError, fmt.Errorf("invalid status: %s", status)
}
result := make(map[string]interface{}, len(documentIDs))
hasError := false
documents, err := s.documentDAO.GetByIDs(documentIDs)
if err != nil {
return nil, common.CodeServerError, fmt.Errorf("failed to fetch documents: %w", err)
}
documentByID := make(map[string]*entity.Document, len(documents))
for _, doc := range documents {
documentByID[doc.ID] = doc
}
for _, docID := range documentIDs {
doc, ok := documentByID[docID]
if !ok {
result[docID] = map[string]string{"error": "Document not found"}
hasError = true
continue
}
if doc.KbID != datasetID {
result[docID] = map[string]string{"error": "Document not found in this dataset."}
hasError = true
continue
}
currentStatus := ""
if doc.Status != nil {
currentStatus = *doc.Status
}
if currentStatus == status {
result[docID] = map[string]string{"status": status}
continue
}
previousStatus := interface{}(nil)
if doc.Status != nil {
previousStatus = *doc.Status
}
if err := s.documentDAO.UpdateByID(docID, map[string]interface{}{"status": status}); err != nil {
result[docID] = map[string]string{"error": "Database error (Document update)!"}
hasError = true
continue
}
if doc.ChunkNum > 0 {
if s.docEngine == nil {
_ = s.documentDAO.UpdateByID(docID, map[string]interface{}{"status": previousStatus})
result[docID] = map[string]string{"error": "Document store update failed: document engine not initialized"}
hasError = true
continue
}
err := s.docEngine.UpdateChunks(
context.Background(),
map[string]interface{}{"doc_id": docID},
map[string]interface{}{"available_int": statusInt},
fmt.Sprintf("ragflow_%s", kb.TenantID),
doc.KbID,
)
if err != nil {
_ = s.documentDAO.UpdateByID(docID, map[string]interface{}{"status": previousStatus})
msg := err.Error()
if strings.Contains(msg, "3022") {
result[docID] = map[string]string{"error": "Document store table missing."}
} else {
result[docID] = map[string]string{"error": "Document store update failed: " + msg}
}
hasError = true
continue
}
}
result[docID] = map[string]string{"status": status}
}
if hasError {
return result, common.CodeServerError, fmt.Errorf("Partial failure")
}
return result, common.CodeSuccess, nil
}
// ListDocumentsByDatasetID list documents by knowledge base ID
func (s *DocumentService) ListDocumentsByDatasetID(kbID string, page, pageSize int) ([]*entity.DocumentListItem, int64, error) {
offset := (page - 1) * pageSize