mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-03 01:01:56 +08:00
fix: unadble to add metadata for file in kb (#16523)
### Summary
As title
Before, it return `update success` but never insert or update any
metadata
fixed:
```go
_, err = s.docEngine.InsertMetadata(nil, []map[string]interface{}{
{
"id": docID,
"kb_id": doc.KbID,
"meta_fields": meta,
},
}, tenantID)
```
This commit is contained in:
@@ -136,7 +136,7 @@ func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, metadata []map
|
||||
// Execute bulk request
|
||||
req := esapi.BulkRequest{
|
||||
Body: bytes.NewReader(buf.Bytes()),
|
||||
Refresh: "false",
|
||||
Refresh: "wait_for",
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, e.client)
|
||||
@@ -193,14 +193,14 @@ func (e *elasticsearchEngine) UpdateMetadata(ctx context.Context, docID string,
|
||||
}
|
||||
|
||||
// Build the document ID for update
|
||||
docID = strings.ReplaceAll(docID, "'", "''")
|
||||
docIDStr := strings.ReplaceAll(docID, "'", "''")
|
||||
datasetIDStr := strings.ReplaceAll(datasetID, "'", "''")
|
||||
|
||||
// Build update body - merge meta_fields with existing
|
||||
query := map[string]interface{}{
|
||||
"bool": map[string]interface{}{
|
||||
"must": []map[string]interface{}{
|
||||
{"term": map[string]interface{}{"id": docID}},
|
||||
{"term": map[string]interface{}{"id": docIDStr}},
|
||||
{"term": map[string]interface{}{"kb_id": datasetIDStr}},
|
||||
},
|
||||
},
|
||||
@@ -243,6 +243,24 @@ func (e *elasticsearchEngine) UpdateMetadata(ctx context.Context, docID string,
|
||||
return fmt.Errorf("elasticsearch update by query returned error: %s", res.Status())
|
||||
}
|
||||
|
||||
var updateResponse map[string]interface{}
|
||||
if err := json.NewDecoder(res.Body).Decode(&updateResponse); err != nil {
|
||||
common.Error("Failed to parse update response", err)
|
||||
return fmt.Errorf("failed to parse update response: %w", err)
|
||||
}
|
||||
if total, ok := updateResponse["total"].(float64); ok && total == 0 {
|
||||
_, err := e.InsertMetadata(ctx, []map[string]interface{}{
|
||||
{
|
||||
"id": docID,
|
||||
"kb_id": datasetID,
|
||||
"meta_fields": metaFields,
|
||||
},
|
||||
}, tenantID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert metadata: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
common.Info("ElasticsearchConnection.UpdateMetadata completes", zap.String("index_name", indexName), zap.String("docID", docID))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1972,9 +1972,7 @@ func (s *DocumentService) SetDocumentMetadata(docID string, meta map[string]inte
|
||||
return fmt.Errorf("failed to get tenant ID: %w", err)
|
||||
}
|
||||
|
||||
// Update metadata using the document engine (merges with existing)
|
||||
err = s.docEngine.UpdateMetadata(nil, docID, doc.KbID, meta, tenantID)
|
||||
if err != nil {
|
||||
if err := s.docEngine.UpdateMetadata(context.Background(), docID, doc.KbID, meta, tenantID); err != nil {
|
||||
return fmt.Errorf("failed to update metadata: %w", err)
|
||||
}
|
||||
|
||||
@@ -2652,6 +2650,35 @@ func (s *DocumentService) replaceDocumentMetadata(docID string, meta map[string]
|
||||
return s.SetDocumentMetadata(docID, map[string]interface{}(meta))
|
||||
}
|
||||
|
||||
func (s *DocumentService) patchDocumentMetadata(docID string, before, after map[string]interface{}) error {
|
||||
if s.docEngine == nil || s.metadataSvc == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
deleteKeys := make([]string, 0)
|
||||
for key := range before {
|
||||
if _, ok := after[key]; !ok {
|
||||
deleteKeys = append(deleteKeys, key)
|
||||
}
|
||||
}
|
||||
if len(deleteKeys) > 0 {
|
||||
if err := s.DeleteDocumentMetadata(docID, deleteKeys); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
updateFields := make(map[string]interface{})
|
||||
for key, value := range after {
|
||||
if !reflect.DeepEqual(before[key], value) {
|
||||
updateFields[key] = value
|
||||
}
|
||||
}
|
||||
if len(updateFields) == 0 {
|
||||
return nil
|
||||
}
|
||||
return s.SetDocumentMetadata(docID, updateFields)
|
||||
}
|
||||
|
||||
func (s *DocumentService) updateDocumentNameOnly(doc *entity.Document, tenantID, newName string) error {
|
||||
if err := s.documentDAO.UpdateByID(doc.ID, map[string]interface{}{"name": newName}); err != nil {
|
||||
return errors.New("Database error (Document rename)!")
|
||||
@@ -3458,18 +3485,8 @@ func (s *DocumentService) BatchUpdateDocumentMetadatas(
|
||||
continue
|
||||
}
|
||||
|
||||
if len(meta) == 0 {
|
||||
if err := s.DeleteDocumentAllMetadata(docID); err != nil {
|
||||
common.Warn("BatchUpdateDocumentMetadata: delete all metadata failed",
|
||||
zap.String("docID", docID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
updated++
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.replaceDocumentMetadata(docID, meta); err != nil {
|
||||
common.Warn("BatchUpdateDocumentMetadata: replace metadata failed",
|
||||
if err := s.patchDocumentMetadata(docID, originalMeta, meta); err != nil {
|
||||
common.Warn("BatchUpdateDocumentMetadata: patch metadata failed",
|
||||
zap.String("docID", docID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -268,7 +268,10 @@ func (m *metadataDocEngine) SearchMetadata(_ context.Context, req *types.SearchM
|
||||
}
|
||||
|
||||
func (m *metadataDocEngine) UpdateMetadata(_ context.Context, docID string, datasetID string, metaFields map[string]interface{}, tenantID string) error {
|
||||
dup := make(map[string]interface{}, len(metaFields))
|
||||
dup := make(map[string]interface{}, len(m.records[docID])+len(metaFields))
|
||||
for k, v := range m.records[docID] {
|
||||
dup[k] = v
|
||||
}
|
||||
for k, v := range metaFields {
|
||||
dup[k] = v
|
||||
}
|
||||
@@ -279,6 +282,24 @@ func (m *metadataDocEngine) UpdateMetadata(_ context.Context, docID string, data
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *metadataDocEngine) InsertMetadata(_ context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error) {
|
||||
for _, doc := range metadata {
|
||||
docID, _ := doc["id"].(string)
|
||||
kbID, _ := doc["kb_id"].(string)
|
||||
metaFields, _ := doc["meta_fields"].(map[string]interface{})
|
||||
if docID == "" || kbID == "" {
|
||||
continue
|
||||
}
|
||||
dup := make(map[string]interface{}, len(metaFields))
|
||||
for k, v := range metaFields {
|
||||
dup[k] = v
|
||||
}
|
||||
m.records[docID] = dup
|
||||
m.docKBs[docID] = kbID
|
||||
}
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
func (m *metadataDocEngine) DeleteMetadata(_ context.Context, condition map[string]interface{}, tenantID string) (int64, error) {
|
||||
docID, _ := condition["id"].(string)
|
||||
if docID == "" {
|
||||
@@ -291,6 +312,33 @@ func (m *metadataDocEngine) DeleteMetadata(_ context.Context, condition map[stri
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (m *metadataDocEngine) DeleteMetadataKeys(_ context.Context, docID string, datasetID string, keys []string, tenantID string) error {
|
||||
meta, ok := m.records[docID]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
for _, key := range keys {
|
||||
delete(meta, key)
|
||||
}
|
||||
if len(meta) == 0 {
|
||||
delete(m.records, docID)
|
||||
return nil
|
||||
}
|
||||
m.records[docID] = meta
|
||||
if _, ok := m.docKBs[docID]; !ok {
|
||||
m.docKBs[docID] = datasetID
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type staleSearchMetadataDocEngine struct {
|
||||
*metadataDocEngine
|
||||
}
|
||||
|
||||
func (m *staleSearchMetadataDocEngine) SearchMetadata(context.Context, *types.SearchMetadataRequest) (*types.SearchMetadataResult, error) {
|
||||
return &types.SearchMetadataResult{MetadataRecords: []map[string]interface{}{}}, nil
|
||||
}
|
||||
|
||||
func (f *failingDeleteMetadataEngine) DeleteMetadata(ctx context.Context, condition map[string]interface{}, tenantID string) (int64, error) {
|
||||
return 0, f.deleteErr
|
||||
}
|
||||
@@ -1666,6 +1714,39 @@ func TestUpdateDatasetDocumentPropagatesMetadataDeleteFailure(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetDocumentMetadataMergesMetadataRow(t *testing.T) {
|
||||
db := setupServiceTestDB(t)
|
||||
pushServiceDB(t, db)
|
||||
insertTestKB(t, "kb-1", "tenant-1", 1, 0, 0)
|
||||
insertNamedTestDoc(t, "doc-1", "kb-1", "doc.txt", 0, 0)
|
||||
|
||||
engine := newMetadataDocEngine(map[string]map[string]interface{}{
|
||||
"doc-1": {
|
||||
"author": "alice",
|
||||
"year": 2025,
|
||||
},
|
||||
}, map[string]string{"doc-1": "kb-1"})
|
||||
svc := testDocumentService(t)
|
||||
svc.docEngine = engine
|
||||
svc.metadataSvc = &MetadataService{kbDAO: dao.NewKnowledgebaseDAO(), docEngine: engine}
|
||||
|
||||
if err := svc.SetDocumentMetadata("doc-1", map[string]interface{}{"category": "tech", "year": 2026}); err != nil {
|
||||
t.Fatalf("SetDocumentMetadata failed: %v", err)
|
||||
}
|
||||
if got := engine.records["doc-1"]["author"]; got != "alice" {
|
||||
t.Fatalf("author = %#v, want alice", got)
|
||||
}
|
||||
if got := engine.records["doc-1"]["category"]; got != "tech" {
|
||||
t.Fatalf("category = %#v, want tech", got)
|
||||
}
|
||||
if got := engine.records["doc-1"]["year"]; got != 2026 {
|
||||
t.Fatalf("year = %#v, want 2026", got)
|
||||
}
|
||||
if got := engine.docKBs["doc-1"]; got != "kb-1" {
|
||||
t.Fatalf("kb_id = %q, want kb-1", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChunkImageStorageKeyUsesImgIDWithDatasetPrefix(t *testing.T) {
|
||||
key, ok := chunkImageStorageKey("kb-1", map[string]interface{}{
|
||||
"id": "chunk-1",
|
||||
@@ -1767,6 +1848,42 @@ func TestBatchUpdateDocumentMetadatasMatchesPythonSemantics(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchUpdateDocumentMetadatasDoesNotReplaceWhenCurrentSearchIsStale(t *testing.T) {
|
||||
db := setupServiceTestDB(t)
|
||||
pushServiceDB(t, db)
|
||||
insertTestKB(t, "kb-1", "tenant-1", 1, 0, 0)
|
||||
insertNamedTestDoc(t, "doc-1", "kb-1", "doc1.txt", 0, 0)
|
||||
|
||||
baseEngine := newMetadataDocEngine(map[string]map[string]interface{}{
|
||||
"doc-1": {"author": "alice"},
|
||||
}, map[string]string{"doc-1": "kb-1"})
|
||||
engine := &staleSearchMetadataDocEngine{metadataDocEngine: baseEngine}
|
||||
|
||||
svc := testDocumentService(t)
|
||||
svc.docEngine = engine
|
||||
svc.metadataSvc = &MetadataService{kbDAO: dao.NewKnowledgebaseDAO(), docEngine: engine}
|
||||
|
||||
resp, code, err := svc.BatchUpdateDocumentMetadatas("kb-1", &DocumentMetadataSelector{
|
||||
DocumentIDs: []string{"doc-1"},
|
||||
}, []DocumentMetadataUpdate{
|
||||
{Key: "category", Value: "paper"},
|
||||
}, nil)
|
||||
if err != nil || code != common.CodeSuccess {
|
||||
t.Fatalf("batch update failed: code=%v err=%v", code, err)
|
||||
}
|
||||
if resp.Updated != 1 || resp.MatchedDocs != 1 {
|
||||
t.Fatalf("resp = %#v, want updated=1 matched=1", resp)
|
||||
}
|
||||
|
||||
got := baseEngine.records["doc-1"]
|
||||
if got["author"] != "alice" {
|
||||
t.Fatalf("author should be preserved, got metadata %#v", got)
|
||||
}
|
||||
if got["category"] != "paper" {
|
||||
t.Fatalf("category = %#v, want paper", got["category"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchUpdateDocumentMetadatasDeletesEmptyMetadataAndNoOps(t *testing.T) {
|
||||
db := setupServiceTestDB(t)
|
||||
pushServiceDB(t, db)
|
||||
|
||||
Reference in New Issue
Block a user