From ee45c97b0b8da635e1659909c67d8aa0a8d9aded Mon Sep 17 00:00:00 2001 From: Haruko386 Date: Thu, 2 Jul 2026 12:06:05 +0800 Subject: [PATCH] fix: unadble to add metadata for file in kb (#16523) ### Summary As title Before, it return `update success` but never insert or update any metadata fixed: ```go _, err = s.docEngine.InsertMetadata(nil, []map[string]interface{}{ { "id": docID, "kb_id": doc.KbID, "meta_fields": meta, }, }, tenantID) ``` --- internal/engine/elasticsearch/metadata.go | 24 ++++- internal/service/document.go | 47 ++++++--- internal/service/document_test.go | 119 +++++++++++++++++++++- 3 files changed, 171 insertions(+), 19 deletions(-) diff --git a/internal/engine/elasticsearch/metadata.go b/internal/engine/elasticsearch/metadata.go index bbf83540e4..a7395f5dd0 100644 --- a/internal/engine/elasticsearch/metadata.go +++ b/internal/engine/elasticsearch/metadata.go @@ -136,7 +136,7 @@ func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, metadata []map // Execute bulk request req := esapi.BulkRequest{ Body: bytes.NewReader(buf.Bytes()), - Refresh: "false", + Refresh: "wait_for", } res, err := req.Do(ctx, e.client) @@ -193,14 +193,14 @@ func (e *elasticsearchEngine) UpdateMetadata(ctx context.Context, docID string, } // Build the document ID for update - docID = strings.ReplaceAll(docID, "'", "''") + docIDStr := strings.ReplaceAll(docID, "'", "''") datasetIDStr := strings.ReplaceAll(datasetID, "'", "''") // Build update body - merge meta_fields with existing query := map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ - {"term": map[string]interface{}{"id": docID}}, + {"term": map[string]interface{}{"id": docIDStr}}, {"term": map[string]interface{}{"kb_id": datasetIDStr}}, }, }, @@ -243,6 +243,24 @@ func (e *elasticsearchEngine) UpdateMetadata(ctx context.Context, docID string, return fmt.Errorf("elasticsearch update by query returned error: %s", res.Status()) } + var updateResponse map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&updateResponse); err != nil { + common.Error("Failed to parse update response", err) + return fmt.Errorf("failed to parse update response: %w", err) + } + if total, ok := updateResponse["total"].(float64); ok && total == 0 { + _, err := e.InsertMetadata(ctx, []map[string]interface{}{ + { + "id": docID, + "kb_id": datasetID, + "meta_fields": metaFields, + }, + }, tenantID) + if err != nil { + return fmt.Errorf("failed to insert metadata: %w", err) + } + } + common.Info("ElasticsearchConnection.UpdateMetadata completes", zap.String("index_name", indexName), zap.String("docID", docID)) return nil } diff --git a/internal/service/document.go b/internal/service/document.go index 50c0b56e34..8165898759 100644 --- a/internal/service/document.go +++ b/internal/service/document.go @@ -1972,9 +1972,7 @@ func (s *DocumentService) SetDocumentMetadata(docID string, meta map[string]inte return fmt.Errorf("failed to get tenant ID: %w", err) } - // Update metadata using the document engine (merges with existing) - err = s.docEngine.UpdateMetadata(nil, docID, doc.KbID, meta, tenantID) - if err != nil { + if err := s.docEngine.UpdateMetadata(context.Background(), docID, doc.KbID, meta, tenantID); err != nil { return fmt.Errorf("failed to update metadata: %w", err) } @@ -2652,6 +2650,35 @@ func (s *DocumentService) replaceDocumentMetadata(docID string, meta map[string] return s.SetDocumentMetadata(docID, map[string]interface{}(meta)) } +func (s *DocumentService) patchDocumentMetadata(docID string, before, after map[string]interface{}) error { + if s.docEngine == nil || s.metadataSvc == nil { + return nil + } + + deleteKeys := make([]string, 0) + for key := range before { + if _, ok := after[key]; !ok { + deleteKeys = append(deleteKeys, key) + } + } + if len(deleteKeys) > 0 { + if err := s.DeleteDocumentMetadata(docID, deleteKeys); err != nil { + return err + } + } + + updateFields := make(map[string]interface{}) + for key, value := range after { + if !reflect.DeepEqual(before[key], value) { + updateFields[key] = value + } + } + if len(updateFields) == 0 { + return nil + } + return s.SetDocumentMetadata(docID, updateFields) +} + func (s *DocumentService) updateDocumentNameOnly(doc *entity.Document, tenantID, newName string) error { if err := s.documentDAO.UpdateByID(doc.ID, map[string]interface{}{"name": newName}); err != nil { return errors.New("Database error (Document rename)!") @@ -3458,18 +3485,8 @@ func (s *DocumentService) BatchUpdateDocumentMetadatas( continue } - if len(meta) == 0 { - if err := s.DeleteDocumentAllMetadata(docID); err != nil { - common.Warn("BatchUpdateDocumentMetadata: delete all metadata failed", - zap.String("docID", docID), zap.Error(err)) - continue - } - updated++ - continue - } - - if err := s.replaceDocumentMetadata(docID, meta); err != nil { - common.Warn("BatchUpdateDocumentMetadata: replace metadata failed", + if err := s.patchDocumentMetadata(docID, originalMeta, meta); err != nil { + common.Warn("BatchUpdateDocumentMetadata: patch metadata failed", zap.String("docID", docID), zap.Error(err)) continue } diff --git a/internal/service/document_test.go b/internal/service/document_test.go index e93510fb66..1237b5f875 100644 --- a/internal/service/document_test.go +++ b/internal/service/document_test.go @@ -268,7 +268,10 @@ func (m *metadataDocEngine) SearchMetadata(_ context.Context, req *types.SearchM } func (m *metadataDocEngine) UpdateMetadata(_ context.Context, docID string, datasetID string, metaFields map[string]interface{}, tenantID string) error { - dup := make(map[string]interface{}, len(metaFields)) + dup := make(map[string]interface{}, len(m.records[docID])+len(metaFields)) + for k, v := range m.records[docID] { + dup[k] = v + } for k, v := range metaFields { dup[k] = v } @@ -279,6 +282,24 @@ func (m *metadataDocEngine) UpdateMetadata(_ context.Context, docID string, data return nil } +func (m *metadataDocEngine) InsertMetadata(_ context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error) { + for _, doc := range metadata { + docID, _ := doc["id"].(string) + kbID, _ := doc["kb_id"].(string) + metaFields, _ := doc["meta_fields"].(map[string]interface{}) + if docID == "" || kbID == "" { + continue + } + dup := make(map[string]interface{}, len(metaFields)) + for k, v := range metaFields { + dup[k] = v + } + m.records[docID] = dup + m.docKBs[docID] = kbID + } + return []string{}, nil +} + func (m *metadataDocEngine) DeleteMetadata(_ context.Context, condition map[string]interface{}, tenantID string) (int64, error) { docID, _ := condition["id"].(string) if docID == "" { @@ -291,6 +312,33 @@ func (m *metadataDocEngine) DeleteMetadata(_ context.Context, condition map[stri return 0, nil } +func (m *metadataDocEngine) DeleteMetadataKeys(_ context.Context, docID string, datasetID string, keys []string, tenantID string) error { + meta, ok := m.records[docID] + if !ok { + return nil + } + for _, key := range keys { + delete(meta, key) + } + if len(meta) == 0 { + delete(m.records, docID) + return nil + } + m.records[docID] = meta + if _, ok := m.docKBs[docID]; !ok { + m.docKBs[docID] = datasetID + } + return nil +} + +type staleSearchMetadataDocEngine struct { + *metadataDocEngine +} + +func (m *staleSearchMetadataDocEngine) SearchMetadata(context.Context, *types.SearchMetadataRequest) (*types.SearchMetadataResult, error) { + return &types.SearchMetadataResult{MetadataRecords: []map[string]interface{}{}}, nil +} + func (f *failingDeleteMetadataEngine) DeleteMetadata(ctx context.Context, condition map[string]interface{}, tenantID string) (int64, error) { return 0, f.deleteErr } @@ -1666,6 +1714,39 @@ func TestUpdateDatasetDocumentPropagatesMetadataDeleteFailure(t *testing.T) { } } +func TestSetDocumentMetadataMergesMetadataRow(t *testing.T) { + db := setupServiceTestDB(t) + pushServiceDB(t, db) + insertTestKB(t, "kb-1", "tenant-1", 1, 0, 0) + insertNamedTestDoc(t, "doc-1", "kb-1", "doc.txt", 0, 0) + + engine := newMetadataDocEngine(map[string]map[string]interface{}{ + "doc-1": { + "author": "alice", + "year": 2025, + }, + }, map[string]string{"doc-1": "kb-1"}) + svc := testDocumentService(t) + svc.docEngine = engine + svc.metadataSvc = &MetadataService{kbDAO: dao.NewKnowledgebaseDAO(), docEngine: engine} + + if err := svc.SetDocumentMetadata("doc-1", map[string]interface{}{"category": "tech", "year": 2026}); err != nil { + t.Fatalf("SetDocumentMetadata failed: %v", err) + } + if got := engine.records["doc-1"]["author"]; got != "alice" { + t.Fatalf("author = %#v, want alice", got) + } + if got := engine.records["doc-1"]["category"]; got != "tech" { + t.Fatalf("category = %#v, want tech", got) + } + if got := engine.records["doc-1"]["year"]; got != 2026 { + t.Fatalf("year = %#v, want 2026", got) + } + if got := engine.docKBs["doc-1"]; got != "kb-1" { + t.Fatalf("kb_id = %q, want kb-1", got) + } +} + func TestChunkImageStorageKeyUsesImgIDWithDatasetPrefix(t *testing.T) { key, ok := chunkImageStorageKey("kb-1", map[string]interface{}{ "id": "chunk-1", @@ -1767,6 +1848,42 @@ func TestBatchUpdateDocumentMetadatasMatchesPythonSemantics(t *testing.T) { } } +func TestBatchUpdateDocumentMetadatasDoesNotReplaceWhenCurrentSearchIsStale(t *testing.T) { + db := setupServiceTestDB(t) + pushServiceDB(t, db) + insertTestKB(t, "kb-1", "tenant-1", 1, 0, 0) + insertNamedTestDoc(t, "doc-1", "kb-1", "doc1.txt", 0, 0) + + baseEngine := newMetadataDocEngine(map[string]map[string]interface{}{ + "doc-1": {"author": "alice"}, + }, map[string]string{"doc-1": "kb-1"}) + engine := &staleSearchMetadataDocEngine{metadataDocEngine: baseEngine} + + svc := testDocumentService(t) + svc.docEngine = engine + svc.metadataSvc = &MetadataService{kbDAO: dao.NewKnowledgebaseDAO(), docEngine: engine} + + resp, code, err := svc.BatchUpdateDocumentMetadatas("kb-1", &DocumentMetadataSelector{ + DocumentIDs: []string{"doc-1"}, + }, []DocumentMetadataUpdate{ + {Key: "category", Value: "paper"}, + }, nil) + if err != nil || code != common.CodeSuccess { + t.Fatalf("batch update failed: code=%v err=%v", code, err) + } + if resp.Updated != 1 || resp.MatchedDocs != 1 { + t.Fatalf("resp = %#v, want updated=1 matched=1", resp) + } + + got := baseEngine.records["doc-1"] + if got["author"] != "alice" { + t.Fatalf("author should be preserved, got metadata %#v", got) + } + if got["category"] != "paper" { + t.Fatalf("category = %#v, want paper", got["category"]) + } +} + func TestBatchUpdateDocumentMetadatasDeletesEmptyMetadataAndNoOps(t *testing.T) { db := setupServiceTestDB(t) pushServiceDB(t, db)