mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
Decouple skill space from Python API (#15971)
### What problem does this PR solve? Make skill space independent of Python filesystem API ### Type of change - [x] Refactoring
This commit is contained in:
@@ -522,10 +522,7 @@ func (h *SkillSearchHandler) DeleteSpace(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Get Authorization header for Python API calls
|
||||
authHeader := c.GetHeader("Authorization")
|
||||
|
||||
code, err := h.spaceService.DeleteSpace(spaceID, user.ID, h.docEngine, authHeader)
|
||||
code, err := h.spaceService.DeleteSpace(spaceID, user.ID, h.docEngine, c.Request.Context())
|
||||
if err != nil {
|
||||
jsonError(c, code, err.Error())
|
||||
return
|
||||
|
||||
@@ -17,13 +17,8 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"ragflow/internal/common"
|
||||
"ragflow/internal/dao"
|
||||
"ragflow/internal/engine"
|
||||
@@ -40,6 +35,7 @@ import (
|
||||
type SkillSpaceService struct {
|
||||
spaceDAO *dao.SkillSpaceDAO
|
||||
fileDAO *dao.FileDAO
|
||||
fileService *FileService
|
||||
configDAO *dao.SkillSearchConfigDAO
|
||||
tenantDAO *dao.TenantDAO
|
||||
skillsFolderCache map[string]string // tenant-keyed cache for skills folder ID
|
||||
@@ -53,6 +49,7 @@ func NewSkillSpaceService() *SkillSpaceService {
|
||||
return &SkillSpaceService{
|
||||
spaceDAO: dao.NewSkillSpaceDAO(),
|
||||
fileDAO: dao.NewFileDAO(),
|
||||
fileService: NewFileService(),
|
||||
configDAO: dao.NewSkillSearchConfigDAO(),
|
||||
tenantDAO: dao.NewTenantDAO(),
|
||||
skillsFolderCache: make(map[string]string),
|
||||
@@ -379,94 +376,9 @@ func (s *SkillSpaceService) UpdateSpace(spaceID string, tenantID string, req *Up
|
||||
return space.ToMap(), common.CodeSuccess, nil
|
||||
}
|
||||
|
||||
// getPythonServiceURL returns the Python service URL from environment or default
|
||||
func getPythonServiceURL() string {
|
||||
url := os.Getenv("PYTHON_SERVICE_URL")
|
||||
if url == "" {
|
||||
url = "http://127.0.0.1:9380"
|
||||
}
|
||||
// Ensure URL has scheme
|
||||
if !strings.HasPrefix(url, "http://") && !strings.HasPrefix(url, "https://") {
|
||||
url = "http://" + url
|
||||
}
|
||||
// Ensure URL has the API path
|
||||
if !strings.HasSuffix(url, "/api/v1/files") {
|
||||
url = strings.TrimSuffix(url, "/")
|
||||
url = url + "/api/v1/files"
|
||||
}
|
||||
return url
|
||||
}
|
||||
|
||||
// deleteFolderViaPythonAPI calls Python backend API to delete folder and its storage
|
||||
func (s *SkillSpaceService) deleteFolderViaPythonAPI(folderID, tenantID, authHeader string) error {
|
||||
pythonURL := getPythonServiceURL()
|
||||
|
||||
reqBody := map[string]interface{}{
|
||||
"ids": []string{folderID},
|
||||
}
|
||||
jsonData, err := json.Marshal(reqBody)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal request: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("DELETE", pythonURL, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
// Use request context with timeout to prevent indefinite blocking
|
||||
deleteCtx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
|
||||
defer cancel()
|
||||
req = req.WithContext(deleteCtx)
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
// Extract raw token from "Bearer <token>" format if present
|
||||
// Python backend needs the raw token for authentication
|
||||
authToken := authHeader
|
||||
if strings.HasPrefix(strings.ToLower(authHeader), "bearer ") {
|
||||
authToken = strings.TrimSpace(authHeader[7:])
|
||||
}
|
||||
req.Header.Set("Authorization", authToken)
|
||||
// Set tenant ID header for Python backend
|
||||
req.Header.Set("X-tenant-id", tenantID)
|
||||
|
||||
common.Info("Calling Python API to delete folder", zap.String("folderID", folderID), zap.String("tenantID", tenantID))
|
||||
|
||||
client := &http.Client{Timeout: 60 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to call Python API: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
common.Info("Python API delete folder response", zap.String("folderID", folderID), zap.Int("status", resp.StatusCode), zap.String("body", string(body)))
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("Python API returned status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
// Parse response to check if deletion was successful
|
||||
var result map[string]interface{}
|
||||
if err := json.Unmarshal(body, &result); err != nil {
|
||||
return fmt.Errorf("failed to parse response: %w", err)
|
||||
}
|
||||
|
||||
if code, ok := result["code"].(float64); !ok || int(code) != 0 {
|
||||
message := "unknown error"
|
||||
if msg, ok := result["message"].(string); ok {
|
||||
message = msg
|
||||
}
|
||||
return fmt.Errorf("Python API returned error: %s", message)
|
||||
}
|
||||
|
||||
common.Info("Successfully deleted folder via Python API", zap.String("folderID", folderID))
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteSpace starts asynchronous deletion of a skills space and returns immediately.
|
||||
// The space status is set to "deleting" and the actual cleanup runs in a background goroutine.
|
||||
func (s *SkillSpaceService) DeleteSpace(spaceID, tenantID string, docEngine engine.DocEngine, authHeader string) (common.ErrorCode, error) {
|
||||
func (s *SkillSpaceService) DeleteSpace(spaceID, tenantID string, docEngine engine.DocEngine, ctx context.Context) (common.ErrorCode, error) {
|
||||
// Get space regardless of status (could be retrying a failed delete)
|
||||
space, err := s.spaceDAO.GetByIDAnyStatus(spaceID)
|
||||
if err != nil {
|
||||
@@ -503,14 +415,14 @@ func (s *SkillSpaceService) DeleteSpace(spaceID, tenantID string, docEngine engi
|
||||
common.Info("Space marked as deleting, starting async cleanup", zap.String("spaceID", spaceID), zap.String("tenantID", tenantID))
|
||||
|
||||
// Launch async deletion in background goroutine
|
||||
go s.asyncDeleteSpace(spaceID, space.FolderID, tenantID, docEngine, authHeader)
|
||||
go s.asyncDeleteSpace(spaceID, space.FolderID, tenantID, docEngine, ctx)
|
||||
|
||||
return common.CodeSuccess, nil
|
||||
}
|
||||
|
||||
// asyncDeleteSpace performs the actual deletion work in the background.
|
||||
// It deletes the search index, removes files via Python API, and soft-deletes the space record.
|
||||
func (s *SkillSpaceService) asyncDeleteSpace(spaceID, folderID, tenantID string, docEngine engine.DocEngine, authHeader string) {
|
||||
// It deletes the search index, removes files via Go FileService, and soft-deletes the space record.
|
||||
func (s *SkillSpaceService) asyncDeleteSpace(spaceID, folderID, tenantID string, docEngine engine.DocEngine, ctx context.Context) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
common.Warn("Panic in asyncDeleteSpace, marking space as deleted", zap.Any("recover", r), zap.String("spaceID", spaceID))
|
||||
@@ -532,18 +444,26 @@ func (s *SkillSpaceService) asyncDeleteSpace(spaceID, folderID, tenantID string,
|
||||
cancel()
|
||||
}
|
||||
|
||||
// Step 2: Delete folder and storage via Python API
|
||||
common.Info("Async deleting space folder via Python API", zap.String("folderID", folderID), zap.String("spaceID", spaceID))
|
||||
if err := s.deleteFolderViaPythonAPI(folderID, tenantID, authHeader); err != nil {
|
||||
common.Error(fmt.Sprintf("Failed to delete space folder via Python API during async delete, spaceID=%s", spaceID), err)
|
||||
// Retry once with a delay
|
||||
// Step 2: Delete folder and storage via Go FileService
|
||||
// Use a fresh background context with timeout, NOT the incoming ctx (which
|
||||
// is the HTTP request context canceled when the handler returns and the
|
||||
// goroutine starts executing).
|
||||
common.Info("Async deleting space folder via Go FileService", zap.String("folderID", folderID), zap.String("spaceID", spaceID))
|
||||
ctxFS, cancelFS := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer cancelFS()
|
||||
success, msg := s.fileService.DeleteFiles(ctxFS, tenantID, []string{folderID})
|
||||
if !success {
|
||||
common.Error(fmt.Sprintf("Failed to delete space folder via Go FileService during async delete, spaceID=%s, msg=%s", spaceID, msg), nil)
|
||||
// Retry once with a delay (same ctxFS, still valid)
|
||||
time.Sleep(5 * time.Second)
|
||||
if retryErr := s.deleteFolderViaPythonAPI(folderID, tenantID, authHeader); retryErr != nil {
|
||||
common.Error(fmt.Sprintf("Retry failed to delete space folder, marking space as deleted anyway, spaceID=%s", spaceID), retryErr)
|
||||
if retrySuccess, retryMsg := s.fileService.DeleteFiles(ctxFS, tenantID, []string{folderID}); !retrySuccess {
|
||||
common.Error(fmt.Sprintf("Retry failed to delete space folder, marking space as deleted anyway, spaceID=%s, msg=%s", spaceID, retryMsg), nil)
|
||||
// Mark as deleted even if folder deletion fails - orphaned folders can be cleaned up later
|
||||
} else {
|
||||
common.Info("Successfully deleted space folder on retry via Go FileService", zap.String("folderID", folderID))
|
||||
}
|
||||
} else {
|
||||
common.Info("Successfully deleted space folder via Python API", zap.String("folderID", folderID))
|
||||
common.Info("Successfully deleted space folder via Go FileService", zap.String("folderID", folderID))
|
||||
}
|
||||
|
||||
// Step 3: Soft delete the space record (status "2" → "0")
|
||||
|
||||
Reference in New Issue
Block a user