feat: Implement API of ragflow server in Go (#15256)

## Summary
- Implemented the Go API endpoint for Memory message forgetting:
  - `DELETE /api/v1/messages/{memory_id}:{message_id}`
- Added route registration for the Memory message DELETE endpoint only.
- Added request path validation for `memory_id:message_id`.
- Added service logic to mark a message as forgotten by setting
`forget_at`.
- Preserved Python-compatible response behavior:
  - Success returns `code: 0`, `message: true`, `data: null`.
- Added focused unit tests for message path parsing and invalid message
ID handling.
- Fixed Linux cgo linker config to use the installed shared PCRE2
library so Go tests/builds can run in this environment.
## Related Issue
Closes: #15240 
## Change Type
- [x] Feature
- [x] Test
- [x] Build / CI compatibility

## Implemented API
- `DELETE /api/v1/messages/{memory_id}:{message_id}`
## Real Behavior Proof
Validated with targeted Go tests:
```bash
/tmp/go1.25.0/bin/go test ./internal/handler ./internal/router
```
Result:
```text
ok  	ragflow/internal/handler
?   	ragflow/internal/router	[no test files]
```
Validated server entrypoint build:
```bash
/tmp/go1.25.0/bin/go build -o /tmp/ragflow-server-main ./cmd/server_main.go
```

Result:
```text
build succeeded
```
Validated patch formatting:
```bash
git diff --check
```

Result:

```text
no whitespace errors
```
## Checklist
- [x] Implemented only `DELETE
/api/v1/messages/{memory_id}:{message_id}`.
- [x] Did not implement unrelated Memory message APIs.
- [x] Added route registration.
- [x] Added handler validation.
- [x] Added service-level memory access check.
- [x] Added tests.
- [x] Ran targeted Go tests.
- [x] Ran server build validation.
- [x] Ran `git diff --check`.
This commit is contained in:
Jonathan Chang
2026-06-10 20:27:35 +07:00
committed by GitHub
parent 3e4fb8cf1c
commit dfcf226ba3
10 changed files with 322 additions and 26 deletions

View File

@@ -20,6 +20,7 @@
package dao
import (
"context"
"fmt"
"ragflow/internal/entity"
"strings"
@@ -123,8 +124,13 @@ func (dao *MemoryDAO) Create(memory *entity.Memory) error {
// - *model.Memory: Memory model pointer
// - error: Database operation error
func (dao *MemoryDAO) GetByID(id string) (*entity.Memory, error) {
return dao.GetByIDWithContext(context.Background(), id)
}
// GetByIDWithContext retrieves a memory record by ID from database with context.
func (dao *MemoryDAO) GetByIDWithContext(ctx context.Context, id string) (*entity.Memory, error) {
var memory entity.Memory
err := DB.Where("id = ?", id).First(&memory).Error
err := DB.WithContext(ctx).Where("id = ?", id).First(&memory).Error
if err != nil {
return nil, err
}

View File

@@ -17,6 +17,7 @@
package dao
import (
"context"
"fmt"
"ragflow/internal/entity"
@@ -57,8 +58,13 @@ func (dao *UserTenantDAO) Delete(id string) error {
// GetByUserID get user tenant relationships by user ID
func (dao *UserTenantDAO) GetByUserID(userID string) ([]*entity.UserTenant, error) {
return dao.GetByUserIDWithContext(context.Background(), userID)
}
// GetByUserIDWithContext gets active user tenant relationships by user ID with context.
func (dao *UserTenantDAO) GetByUserIDWithContext(ctx context.Context, userID string) ([]*entity.UserTenant, error) {
var relations []*entity.UserTenant
err := DB.Where("user_id = ? AND status = ?", userID, "1").Find(&relations).Error
err := DB.WithContext(ctx).Where("user_id = ? AND status = ?", userID, "1").Find(&relations).Error
return relations, err
}

View File

@@ -23,6 +23,7 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"reflect"
"regexp"
@@ -281,22 +282,22 @@ func (e *elasticsearchEngine) updateSingleChunk(ctx context.Context, indexName,
hits, ok := searchResult["hits"].(map[string]interface{})
if !ok {
return fmt.Errorf("elasticsearch update error: 404 Not Found")
return fmt.Errorf("%w: %s", types.ErrDocumentNotFound, chunkID)
}
hitList, ok := hits["hits"].([]interface{})
if !ok || len(hitList) == 0 {
return fmt.Errorf("elasticsearch update error: 404 Not Found")
return fmt.Errorf("%w: %s", types.ErrDocumentNotFound, chunkID)
}
firstHit, ok := hitList[0].(map[string]interface{})
if !ok {
return fmt.Errorf("elasticsearch update error: 404 Not Found")
return fmt.Errorf("%w: %s", types.ErrDocumentNotFound, chunkID)
}
actualID, ok := firstHit["_id"].(string)
if !ok {
return fmt.Errorf("elasticsearch update error: 404 Not Found")
return fmt.Errorf("%w: %s", types.ErrDocumentNotFound, chunkID)
}
doc := copyFields(newValue)
@@ -402,6 +403,9 @@ func (e *elasticsearchEngine) updateSingleChunk(ctx context.Context, indexName,
}
defer res.Body.Close()
if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return fmt.Errorf("%w: %s", types.ErrDocumentNotFound, chunkID)
}
return fmt.Errorf("elasticsearch update error: %s", res.Status())
}
}

View File

@@ -17,6 +17,7 @@
package types
import (
"errors"
"fmt"
"go.uber.org/zap"
@@ -24,6 +25,8 @@ import (
"ragflow/internal/common"
)
var ErrDocumentNotFound = errors.New("document not found")
// SearchRequest unified search request for all engines
type SearchRequest struct {
// Search target

View File

@@ -20,6 +20,7 @@
package handler
import (
"errors"
"net/http"
"os"
"strconv"
@@ -549,8 +550,8 @@ func (h *MemoryHandler) AddMessage(c *gin.Context) {
})
}
// ForgetMessage handles DELETE request for forgetting messages
// API Path: DELETE /api/v1/messages/:memory_id/:message_id
// ForgetMessage handles DELETE request for forgetting messages.
// API Path: DELETE /api/v1/messages/{memory_id}:{message_id}
//
// Function:
// - Soft-deletes the specified message (sets forget_at timestamp)
@@ -559,16 +560,82 @@ func (h *MemoryHandler) AddMessage(c *gin.Context) {
// Parameter Format:
// - memory_id: Memory ID
// - message_id: Message ID (integer)
//
// TODO: Implementation pending - depends on embedding engine
func (h *MemoryHandler) ForgetMessage(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
memoryID, messageID, err := parseMemoryMessagePath(c.Param("memory_message"))
if err != nil {
c.JSON(http.StatusOK, gin.H{
"code": common.CodeArgumentError,
"message": err.Error(),
"data": nil,
})
return
}
if err := h.memoryService.ForgetMessage(c.Request.Context(), user.ID, memoryID, messageID); err != nil {
errMsg := err.Error()
if isMemoryServiceNotFound(err) {
c.JSON(http.StatusOK, gin.H{
"code": common.CodeNotFound,
"message": errMsg,
"data": nil,
})
return
}
c.JSON(http.StatusOK, gin.H{
"code": common.CodeServerError,
"message": "Internal server error",
"data": nil,
})
return
}
c.JSON(http.StatusOK, gin.H{
"code": common.CodeServerError,
"message": "ForgetMessage not implemented - pending embedding engine dependency",
"code": common.CodeSuccess,
"message": true,
"data": nil,
})
}
func isMemoryServiceNotFound(err error) bool {
var notFoundErr *service.ResourceNotFoundError
return errors.As(err, &notFoundErr) && notFoundErr.Resource == "Memory"
}
func parseMemoryMessagePath(memoryMessage string) (string, int64, error) {
memoryMessage = strings.TrimSpace(memoryMessage)
if memoryMessage == "" {
return "", 0, errors.New("memory_id and message_id are required")
}
parts := strings.Split(memoryMessage, ":")
if len(parts) != 2 {
return "", 0, errors.New("message path must be formatted as memory_id:message_id")
}
memoryID := strings.TrimSpace(parts[0])
messageIDText := strings.TrimSpace(parts[1])
if memoryID == "" {
return "", 0, errors.New("memory_id is required")
}
if messageIDText == "" {
return "", 0, errors.New("message_id is required")
}
messageID, err := strconv.ParseInt(messageIDText, 10, 64)
if err != nil || messageID < 0 {
return "", 0, errors.New("message_id must be a non-negative integer")
}
return memoryID, messageID, nil
}
// UpdateMessage handles PUT request for updating message status
// API Path: PUT /api/v1/messages/:memory_id/:message_id
//

View File

@@ -0,0 +1,91 @@
package handler
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/gin-gonic/gin"
"ragflow/internal/common"
"ragflow/internal/entity"
"ragflow/internal/service"
)
func TestIsMemoryServiceNotFound(t *testing.T) {
notFoundErr := &service.ResourceNotFoundError{Resource: "Memory", ID: "memory-1"}
if !isMemoryServiceNotFound(fmt.Errorf("wrapped: %w", notFoundErr)) {
t.Fatal("expected wrapped service not found error to map to not found")
}
messageNotFoundErr := &service.ResourceNotFoundError{Resource: "Message", ID: "message-1"}
if isMemoryServiceNotFound(messageNotFoundErr) {
t.Fatal("expected non-memory resource not found error to avoid memory 404 mapping")
}
if isMemoryServiceNotFound(fmt.Errorf("backend index does not exist")) {
t.Fatal("backend text should not map to not found without service error type")
}
}
func TestParseMemoryMessagePath(t *testing.T) {
tests := []struct {
name string
value string
wantMemoryID string
wantMessageID int64
wantErr bool
}{
{name: "valid", value: "memory-1:42", wantMemoryID: "memory-1", wantMessageID: 42},
{name: "empty", value: "", wantErr: true},
{name: "missing message id", value: "memory-1:", wantErr: true},
{name: "missing memory id", value: ":42", wantErr: true},
{name: "invalid message id", value: "memory-1:not-int", wantErr: true},
{name: "negative message id", value: "memory-1:-1", wantErr: true},
{name: "too many separators", value: "memory-1:2:3", wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
memoryID, messageID, err := parseMemoryMessagePath(tt.value)
if tt.wantErr {
if err == nil {
t.Fatalf("expected error")
}
return
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if memoryID != tt.wantMemoryID || messageID != tt.wantMessageID {
t.Fatalf("got (%q, %d), want (%q, %d)", memoryID, messageID, tt.wantMemoryID, tt.wantMessageID)
}
})
}
}
func TestForgetMessageRejectsMalformedPath(t *testing.T) {
gin.SetMode(gin.TestMode)
router := gin.New()
h := NewMemoryHandler(service.NewMemoryService())
router.DELETE("/api/v1/messages/:memory_message", func(c *gin.Context) {
c.Set("user", &entity.User{ID: "user-1"})
h.ForgetMessage(c)
})
req := httptest.NewRequest(http.MethodDelete, "/api/v1/messages/memory-1:not-int", nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want %d", w.Code, http.StatusOK)
}
var res map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &res); err != nil {
t.Fatalf("decode response: %v", err)
}
if code := common.ErrorCode(res["code"].(float64)); code != common.CodeArgumentError {
t.Fatalf("code = %v, want %v; body=%s", code, common.CodeArgumentError, w.Body.String())
}
}

View File

@@ -308,16 +308,11 @@ func (r *Router) Setup(engine *gin.Engine) {
memory.GET("/:memory_id", r.memoryHandler.GetMemoryMessages)
}
// TODO: Message routes - Implementation pending - depends on CanvasService, TaskService and embedding engine
// message := v1.Group("/messages")
// {
// message.POST("", r.memoryHandler.AddMessage)
// message.DELETE("/:memory_id/:message_id", r.memoryHandler.ForgetMessage)
// message.PUT("/:memory_id/:message_id", r.memoryHandler.UpdateMessage)
// message.GET("/search", r.memoryHandler.SearchMessage)
// message.GET("", r.memoryHandler.GetMessages)
// message.GET("/:memory_id/:message_id/content", r.memoryHandler.GetMessageContent)
// }
// Message routes
message := v1.Group("/messages")
{
message.DELETE("/:memory_message", r.memoryHandler.ForgetMessage)
}
// Skill search routes
skills := v1.Group("/skills")

View File

@@ -17,6 +17,7 @@
package service
import (
"context"
"errors"
"fmt"
"ragflow/internal/common"
@@ -26,6 +27,8 @@ import (
"time"
"ragflow/internal/dao"
"ragflow/internal/engine"
enginetypes "ragflow/internal/engine/types"
)
const (
@@ -72,6 +75,16 @@ var validForgettingPolicies = map[ForgettingPolicy]bool{
ForgettingPolicyFIFO: true,
}
// ResourceNotFoundError marks client-visible missing memory/message resources.
type ResourceNotFoundError struct {
Resource string
ID string
}
func (e *ResourceNotFoundError) Error() string {
return fmt.Sprintf("%s '%s' not found.", e.Resource, e.ID)
}
//
// Note: CalculateMemoryType and GetMemoryTypeHuman functions have been moved to dao package
// Use dao.CalculateMemoryType() and dao.GetMemoryTypeHuman() instead
@@ -226,6 +239,7 @@ func generateOutputFormat(typesToExtract []string) string {
// It provides methods for creating, updating, deleting, and querying memories
type MemoryService struct {
memoryDAO *dao.MemoryDAO
docEngine engine.DocEngine
}
// NewMemoryService creates a new MemoryService instance
@@ -235,6 +249,7 @@ type MemoryService struct {
func NewMemoryService() *MemoryService {
return &MemoryService{
memoryDAO: dao.NewMemoryDAO(),
docEngine: engine.Get(),
}
}
@@ -755,6 +770,82 @@ func (s *MemoryService) DeleteMemory(memoryID string) error {
return nil
}
// ForgetMessage marks a memory message as forgotten by setting forget_at.
// This mirrors Python memory_api_service.forget_message and keeps the message
// record for retention/cleanup policies instead of deleting it immediately.
func (s *MemoryService) ForgetMessage(ctx context.Context, userID string, memoryID string, messageID int64) error {
memory, err := s.requireMemoryAccess(ctx, userID, memoryID)
if err != nil {
return err
}
if s.docEngine == nil {
return errors.New("message store is not initialized")
}
now := time.Now().UTC()
forgetTime := now.Format("2006-01-02 15:04:05")
messageDocID := fmt.Sprintf("%s_%d", memoryID, messageID)
updates := map[string]interface{}{
"forget_at": forgetTime,
"forget_at_flt": now.UnixMilli(),
}
condition := map[string]interface{}{
"id": messageDocID,
}
indexName := fmt.Sprintf("memory_%s", memory.TenantID)
if err := s.docEngine.UpdateChunks(ctx, condition, updates, indexName, memoryID); err != nil {
if isMessageDocumentNotFound(err) {
// Match Python delete-by-query behavior: forgetting an already-missing
// message document is idempotent and still considered successful.
return nil
}
return fmt.Errorf("failed to forget message '%d' in memory '%s': %w", messageID, memoryID, err)
}
return nil
}
func isMessageDocumentNotFound(err error) bool {
return errors.Is(err, enginetypes.ErrDocumentNotFound)
}
func (s *MemoryService) requireMemoryAccess(ctx context.Context, userID string, memoryID string) (*entity.Memory, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
memory, err := s.memoryDAO.GetByIDWithContext(ctx, memoryID)
if err != nil {
if dao.IsNotFoundErr(err) {
return nil, &ResourceNotFoundError{Resource: "Memory", ID: memoryID}
}
return nil, fmt.Errorf("failed to get memory '%s': %w", memoryID, err)
}
if memory.TenantID == userID {
return memory, nil
}
if memory.Permissions != string(TenantPermissionTeam) {
return nil, &ResourceNotFoundError{Resource: "Memory", ID: memoryID}
}
if err := ctx.Err(); err != nil {
return nil, err
}
userTenantService := NewUserTenantService()
userTenants, err := userTenantService.GetUserTenantRelationByUserIDWithContext(ctx, userID)
if err != nil {
return nil, err
}
for _, tenant := range userTenants {
if tenant.TenantID == memory.TenantID {
return memory, nil
}
}
return nil, &ResourceNotFoundError{Resource: "Memory", ID: memoryID}
}
// ListMemories retrieves a paginated list of memories with optional filters
// When tenantIDs is empty, it retrieves all tenants associated with the user
//
@@ -851,9 +942,6 @@ func (s *MemoryService) GetMemoryConfig(memoryID string) (*CreateMemoryResponse,
// TODO: AddMessage - Implementation pending - depends on embedding engine
// func (s *MemoryService) AddMessage(memoryIDs []string, messageDict map[string]interface{}) (bool, string, error) { ... }
// TODO: ForgetMessage - Implementation pending - depends on embedding engine
// func (s *MemoryService) ForgetMessage(memoryID string, messageID int) (bool, error) { ... }
// TODO: UpdateMessageStatus - Implementation pending - depends on embedding engine
// func (s *MemoryService) UpdateMessageStatus(memoryID string, messageID int, status bool) (bool, error) { ... }

View File

@@ -0,0 +1,30 @@
package service
import (
"context"
"errors"
"fmt"
"testing"
enginetypes "ragflow/internal/engine/types"
)
func TestIsMessageDocumentNotFound(t *testing.T) {
if !isMessageDocumentNotFound(fmt.Errorf("wrapped: %w", enginetypes.ErrDocumentNotFound)) {
t.Fatal("expected wrapped document-not-found error to be recognized")
}
if isMessageDocumentNotFound(errors.New("index does not exist")) {
t.Fatal("expected unrelated backend error to remain a server error")
}
}
func TestRequireMemoryAccessReturnsCanceledContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := ctx.Err()
if _, gotErr := NewMemoryService().requireMemoryAccess(ctx, "user-1", "memory-1"); !errors.Is(gotErr, err) {
t.Fatalf("requireMemoryAccess error = %v, want %v", gotErr, err)
}
}

View File

@@ -17,6 +17,7 @@
package service
import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
@@ -1116,7 +1117,12 @@ type UserTenantRelation struct {
* }
*/
func (s *UserTenantService) GetUserTenantRelationByUserID(userID string) ([]*UserTenantRelation, error) {
relations, err := s.userTenantDAO.GetByUserID(userID)
return s.GetUserTenantRelationByUserIDWithContext(context.Background(), userID)
}
// GetUserTenantRelationByUserIDWithContext retrieves all user-tenant relationships for a given user ID with context.
func (s *UserTenantService) GetUserTenantRelationByUserIDWithContext(ctx context.Context, userID string) ([]*UserTenantRelation, error) {
relations, err := s.userTenantDAO.GetByUserIDWithContext(ctx, userID)
if err != nil {
return nil, err
}