fix(elasticsearch): complete Go result functions (#15148)

## Summary
- Complete the Go Elasticsearch result functions that remained stubbed
after #15160.
- Add focused unit coverage for field mapping, aggregation, IDs, and
highlighting behavior.
- Update a stale query-builder test type import discovered during
validation.

## What changed
- Keep the Elasticsearch Go implementation merged in #15160 and fill in
`GetFields`, `GetAggregation`, `GetHighlight`, and `GetDocIDs` in
`internal/engine/elasticsearch/chunk.go`.
- Add regression and invariant coverage in
`internal/engine/elasticsearch/chunk_helpers_test.go`.
- Update `internal/service/nlp/query_builder_test.go` to use the current
`types.MatchTextExpr` type.

## Why
- #15160 implemented the main Go Elasticsearch surface, but
retrieval/tag flows still call result functions that returned stubs.
- Completing these functions keeps Elasticsearch result processing
aligned with the expected document-engine behavior for field extraction,
tag aggregation, doc ID extraction, and snippet highlighting.

## Validation
- `go test ./internal/engine/elasticsearch`
- `GOARCH=arm64 CGO_ENABLED=1 go test ./internal/service/nlp -run
TestQueryBuilder`
- `git diff --check`
- CodeRabbit review reported 0 issues after follow-up fixes.
- Codex Security diff scan found no reportable issues.

## Notes
- This PR is now a follow-up to #15160 rather than a competing
implementation.
- A full local `go test ./internal/service/nlp` run is blocked by local
WordNet resource prerequisites; the query-builder tests touched by this
PR pass with the arm64 CGO path.
This commit is contained in:
ghost
2026-06-09 06:10:11 -06:00
committed by GitHub
parent 10bbe6b5d4
commit 64b860f771
3 changed files with 408 additions and 36 deletions

View File

@@ -18,12 +18,14 @@ package elasticsearch
import (
"bytes"
"cmp"
"context"
"encoding/json"
"fmt"
"io"
"os"
"reflect"
"regexp"
"slices"
"sort"
"strconv"
@@ -41,6 +43,14 @@ var jsonIterator = jsoniter.Config{
SortMapKeys: false,
}.Froze()
var (
elasticsearchHighlightEmTagRE = regexp.MustCompile(`<em>[^<>]+</em>`)
elasticsearchHighlightNewlineRE = regexp.MustCompile(`[\r\n]`)
elasticsearchHighlightDelimiterRE = regexp.MustCompile(`[.?!;\n]`)
elasticsearchLetterRE = regexp.MustCompile(`\pL`)
elasticsearchEnglishLetterRE = regexp.MustCompile(`[A-Za-z]`)
)
// CreateChunkStore creates an index
func (e *elasticsearchEngine) CreateChunkStore(ctx context.Context, baseName, datasetID string, vectorSize int, parserID string) error {
if baseName == "" {
@@ -712,11 +722,12 @@ type SearchResponse struct {
Value int64 `json:"value"`
} `json:"total"`
Hits []struct {
ID string `json:"_id"`
Index string `json:"_index"`
Score float64 `json:"_score"`
Source map[string]interface{} `json:"_source"`
Fields map[string]interface{} `json:"fields"` // ES 9.x stores dense_vector here
ID string `json:"_id"`
Index string `json:"_index"`
Score float64 `json:"_score"`
Source map[string]interface{} `json:"_source"`
Fields map[string]interface{} `json:"fields"` // ES 9.x stores dense_vector here
Highlight map[string]interface{} `json:"highlight,omitempty"`
// Sort is populated when the request body specifies a `sort`
// clause. The last hit's Sort is the cursor for the next
// search_after request — without it, deep pagination can't
@@ -1581,10 +1592,13 @@ func (e *elasticsearchEngine) GetFields(chunks []map[string]interface{}, fields
}
for _, chunk := range chunks {
docID, ok := chunk["_id"].(string)
docID, ok := elasticsearchChunkID(chunk)
if !ok {
continue
}
if id, ok := chunk["id"].(string); !ok || id == "" {
chunk["id"] = docID
}
m := make(map[string]interface{})
for field := range fieldSet {
@@ -1647,23 +1661,56 @@ func (e *elasticsearchEngine) GetAggregation(chunks []map[string]interface{}, fi
return []map[string]interface{}{}
}
counts := make(map[string]int)
tagCounts := make(map[string]int)
for _, chunk := range chunks {
if val, ok := chunk[fieldName]; ok && val != nil {
key := fmt.Sprintf("%v", val)
if key != "" {
counts[key]++
value, ok := chunk[fieldName]
if !ok || value == nil {
continue
}
if valueStr, ok := value.(string); ok {
if valueStr == "" {
continue
}
separator := ","
if fieldName == "tag_kwd" && strings.Contains(valueStr, "###") {
separator = "###"
}
for _, tag := range strings.Split(valueStr, separator) {
countElasticsearchAggregationTag(tagCounts, tag)
}
continue
}
if valueList, ok := value.([]interface{}); ok {
for _, item := range valueList {
if itemStr, ok := item.(string); ok {
countElasticsearchAggregationTag(tagCounts, itemStr)
}
}
}
}
result := make([]map[string]interface{}, 0, len(counts))
for key, count := range counts {
result = append(result, map[string]interface{}{
"key": key,
"count": count,
})
if len(tagCounts) == 0 {
return []map[string]interface{}{}
}
tags := make([]string, 0, len(tagCounts))
for tag := range tagCounts {
tags = append(tags, tag)
}
slices.SortFunc(tags, func(a, b string) int {
if byCount := cmp.Compare(tagCounts[b], tagCounts[a]); byCount != 0 {
return byCount
}
return cmp.Compare(a, b)
})
result := make([]map[string]interface{}, len(tags))
for i, tag := range tags {
result[i] = map[string]interface{}{"key": tag, "count": tagCounts[tag]}
}
return result
}
@@ -1672,7 +1719,7 @@ func (e *elasticsearchEngine) GetAggregation(chunks []map[string]interface{}, fi
func (e *elasticsearchEngine) GetChunkIDs(chunks []map[string]interface{}) []string {
ids := make([]string, 0, len(chunks))
for _, chunk := range chunks {
if id, ok := chunk["_id"].(string); ok {
if id, ok := elasticsearchChunkID(chunk); ok {
ids = append(ids, id)
}
}
@@ -1686,35 +1733,135 @@ func (e *elasticsearchEngine) GetHighlight(chunks []map[string]interface{}, keyw
return result
}
normalizedKeywords := normalizeElasticsearchHighlightKeywords(keywords)
englishPatterns := compileElasticsearchHighlightPatterns(normalizedKeywords)
nonEnglishPattern := compileElasticsearchNonEnglishHighlightPattern(normalizedKeywords)
for _, chunk := range chunks {
docID, ok := chunk["_id"].(string)
docID, ok := elasticsearchChunkID(chunk)
if !ok {
continue
}
highlight, ok := chunk["highlight"].(map[string]interface{})
if !ok || len(highlight) == 0 {
if highlightText := firstElasticsearchHighlight(chunk); highlightText != "" {
result[docID] = highlightText
continue
}
// Get first highlight entry
var highlightText string
for _, vals := range highlight {
if arr, ok := vals.([]interface{}); ok && len(arr) > 0 {
if str, ok := arr[0].(string); ok {
highlightText = str
txt, ok := chunk[fieldName].(string)
if fieldName == "content_with_weight" && (!ok || txt == "") {
txt, ok = chunk["content"].(string)
}
if !ok || txt == "" {
continue
}
if elasticsearchHighlightEmTagRE.MatchString(txt) {
result[docID] = txt
continue
}
txt = elasticsearchHighlightNewlineRE.ReplaceAllString(txt, " ")
segments := elasticsearchHighlightDelimiterRE.Split(txt, -1)
var highlightedSegments []string
for _, segment := range segments {
segmentToCheck := segment
if isMostlyEnglishElasticsearchSegment(segment) {
for _, pattern := range englishPatterns {
segmentToCheck = pattern.ReplaceAllString(segmentToCheck, "$1<em>$2</em>$3")
}
break
} else if nonEnglishPattern != nil {
segmentToCheck = nonEnglishPattern.ReplaceAllStringFunc(segmentToCheck, func(match string) string {
return "<em>" + match + "</em>"
})
}
if segmentToCheck != segment {
highlightedSegments = append(highlightedSegments, strings.TrimSpace(segmentToCheck))
}
}
if highlightText != "" {
result[docID] = highlightText
if len(highlightedSegments) > 0 {
result[docID] = strings.Join(highlightedSegments, "... ")
}
}
return result
}
func elasticsearchChunkID(chunk map[string]interface{}) (string, bool) {
if id, ok := chunk["id"].(string); ok && id != "" {
return id, true
}
if id, ok := chunk["_id"].(string); ok && id != "" {
return id, true
}
return "", false
}
func firstElasticsearchHighlight(chunk map[string]interface{}) string {
highlight, ok := chunk["highlight"].(map[string]interface{})
if !ok || len(highlight) == 0 {
return ""
}
for _, vals := range highlight {
if arr, ok := vals.([]interface{}); ok && len(arr) > 0 {
if str, ok := arr[0].(string); ok {
return str
}
}
}
return ""
}
func countElasticsearchAggregationTag(counts map[string]int, tag string) {
if tag = strings.TrimSpace(tag); tag != "" {
counts[tag]++
}
}
func isMostlyEnglishElasticsearchSegment(segment string) bool {
totalCount := len(elasticsearchLetterRE.FindAllString(segment, -1))
return totalCount > 0 && float64(len(elasticsearchEnglishLetterRE.FindAllString(segment, -1)))/float64(totalCount) > 0.5
}
func compileElasticsearchHighlightPatterns(keywords []string) []*regexp.Regexp {
patterns := make([]*regexp.Regexp, 0, len(keywords))
for _, kw := range keywords {
patterns = append(patterns, regexp.MustCompile(`(?i)(^|[ .?/'\"\(\)!,:;-])(`+regexp.QuoteMeta(kw)+`)([ .?/'\"\(\)!,:;-]|$)`))
}
return patterns
}
func compileElasticsearchNonEnglishHighlightPattern(keywords []string) *regexp.Regexp {
if len(keywords) == 0 {
return nil
}
parts := make([]string, 0, len(keywords))
for _, kw := range keywords {
parts = append(parts, regexp.QuoteMeta(kw))
}
return regexp.MustCompile(strings.Join(parts, "|"))
}
func normalizeElasticsearchHighlightKeywords(keywords []string) []string {
seen := make(map[string]struct{}, len(keywords))
normalized := make([]string, 0, len(keywords))
for _, kw := range keywords {
if kw == "" {
continue
}
if _, ok := seen[kw]; !ok {
seen[kw] = struct{}{}
normalized = append(normalized, kw)
}
}
slices.SortStableFunc(normalized, func(a, b string) int {
return cmp.Compare(len(b), len(a))
})
return normalized
}
// DropChunkStore deletes a chunk index
func (e *elasticsearchEngine) DropChunkStore(ctx context.Context, baseName, datasetID string) error {
return e.dropIndex(ctx, baseName)
@@ -2088,9 +2235,15 @@ func convertESResponse(esResp *SearchResponse, vectorFieldName string) []map[str
chunks := make([]map[string]interface{}, len(esResp.Hits.Hits))
for i, hit := range esResp.Hits.Hits {
chunks[i] = hit.Source
if chunks[i] == nil {
chunks[i] = make(map[string]interface{})
}
chunks[i]["_score"] = hit.Score
chunks[i]["_id"] = hit.ID
chunks[i]["_index"] = hit.Index
if len(hit.Highlight) > 0 {
chunks[i]["highlight"] = hit.Highlight
}
}
return chunks
}

View File

@@ -0,0 +1,218 @@
package elasticsearch
import (
"reflect"
"strings"
"testing"
)
func TestElasticsearchGetFieldsFiltersAndUsesIDFallback(t *testing.T) {
engine := &elasticsearchEngine{}
chunks := []map[string]interface{}{
{
"_id": "fallback-chunk",
"docnm_kwd": []interface{}{"guide.md"},
"content_with_weight": "Alpha beta body.",
"available_int": float64(1),
"ignored": "not requested",
},
}
got := engine.GetFields(chunks, []string{"id", "docnm_kwd", "content_with_weight", "available_int"})
fieldMap, ok := got["fallback-chunk"]
if !ok {
t.Fatalf("GetFields keys=%v, want fallback-chunk", got)
}
assertEqual(t, fieldMap["id"], "fallback-chunk")
assertEqual(t, fieldMap["docnm_kwd"], "guide.md")
assertEqual(t, fieldMap["content_with_weight"], "Alpha beta body.")
assertEqual(t, fieldMap["available_int"], float64(1))
if _, ok := fieldMap["ignored"]; ok {
t.Fatalf("field filter leaked unrequested field: %#v", fieldMap)
}
}
func TestElasticsearchGetFieldsEmptyAndSkippedIDs(t *testing.T) {
engine := &elasticsearchEngine{}
if got := engine.GetFields(nil, nil); got == nil || len(got) != 0 {
t.Fatalf("GetFields(nil)=%#v, want empty non-nil map", got)
}
got := engine.GetFields([]map[string]interface{}{
{"id": "chunk-1", "docnm_kwd": "doc.md"},
{"id": "", "_id": "fallback-chunk", "docnm_kwd": "fallback.md"},
{"docnm": "missing-id.md"},
}, []string{"id", "docnm_kwd"})
fieldMap, ok := got["chunk-1"]
if !ok {
t.Fatalf("GetFields keys=%v, want chunk-1", got)
}
assertEqual(t, fieldMap["id"], "chunk-1")
assertEqual(t, fieldMap["docnm_kwd"], "doc.md")
if _, ok := got["missing-id.md"]; ok {
t.Fatalf("chunk without id should be skipped: %#v", got)
}
if fallbackMap, ok := got["fallback-chunk"]; !ok {
t.Fatalf("GetFields keys=%v, want fallback-chunk", got)
} else {
assertEqual(t, fallbackMap["id"], "fallback-chunk")
assertEqual(t, fallbackMap["docnm_kwd"], "fallback.md")
}
}
func TestElasticsearchGetAggregationSplitsCountsAndSorts(t *testing.T) {
engine := &elasticsearchEngine{}
chunks := []map[string]interface{}{
{"tag_kwd": "red###blue###"},
{"tag_kwd": []interface{}{"blue", " green ", ""}},
{"tag_kwd": "blue"},
{"tag_kwd": ""},
{},
}
got := engine.GetAggregation(chunks, "tag_kwd")
assertEqual(t, aggregationCounts(t, got), map[string]int{"blue": 3, "red": 1, "green": 1})
if len(got) == 0 || got[0]["key"] != "blue" || got[0]["count"] != 3 {
t.Fatalf("first aggregation=%#v, want blue count 3", got)
}
if len(got) != 3 || got[1]["key"] != "green" || got[2]["key"] != "red" {
t.Fatalf("tie ordering=%#v, want green before red", got)
}
docAgg := engine.GetAggregation([]map[string]interface{}{
{"docnm_kwd": "guide.md, api.md"},
{"docnm_kwd": "guide.md"},
}, "docnm_kwd")
assertEqual(t, aggregationCounts(t, docAgg), map[string]int{"guide.md": 2, "api.md": 1})
if got := engine.GetAggregation(chunks, "missing_kwd"); got == nil || len(got) != 0 {
t.Fatalf("missing aggregation=%#v, want empty non-nil slice", got)
}
}
func TestElasticsearchGetChunkIDsPreservesOrderWithFallback(t *testing.T) {
engine := &elasticsearchEngine{}
chunks := []map[string]interface{}{
{"id": "source-id", "_id": "hit-id"},
{"_id": "fallback-id"},
{"id": ""},
{"id": 42},
{"id": "last-id"},
}
got := engine.GetChunkIDs(chunks)
assertEqual(t, got, []string{"source-id", "fallback-id", "last-id"})
if got := engine.GetChunkIDs(nil); got == nil || len(got) != 0 {
t.Fatalf("GetChunkIDs(nil)=%#v, want empty non-nil slice", got)
}
}
func TestElasticsearchGetHighlightFallbackAndBoundaries(t *testing.T) {
engine := &elasticsearchEngine{}
chunks := []map[string]interface{}{
{
"_id": "fallback-id",
"content": "Alpha beta.\nbetamax soup. BETA again!",
},
}
got := engine.GetHighlight(chunks, []string{"beta"}, "content_with_weight")
assertEqual(t, got, map[string]string{
"fallback-id": "Alpha <em>beta</em>... <em>BETA</em> again",
})
if gotText := got["fallback-id"]; strings.Contains(gotText, "<em>beta</em>max") {
t.Fatalf("highlight matched inside a larger token: %q", gotText)
}
gotLaterFallback := engine.GetHighlight([]map[string]interface{}{
{"_id": "first"},
{"_id": "second", "content": "Gamma beta."},
}, []string{"beta"}, "content_with_weight")
assertEqual(t, gotLaterFallback, map[string]string{
"second": "Gamma <em>beta</em>",
})
gotMixedFallback := engine.GetHighlight([]map[string]interface{}{
{"id": "weighted", "content_with_weight": "Weighted beta."},
{"id": "plain", "content": "Plain beta."},
}, []string{"beta"}, "content_with_weight")
assertEqual(t, gotMixedFallback, map[string]string{
"plain": "Plain <em>beta</em>",
"weighted": "Weighted <em>beta</em>",
})
gotEmptyFallback := engine.GetHighlight([]map[string]interface{}{
{"id": "empty-weighted", "content_with_weight": "", "content": "Empty fallback beta."},
}, []string{"beta"}, "content_with_weight")
assertEqual(t, gotEmptyFallback, map[string]string{
"empty-weighted": "Empty fallback <em>beta</em>",
})
gotInvalidFallback := engine.GetHighlight([]map[string]interface{}{
{"id": "invalid-weighted", "content_with_weight": nil, "content": "Invalid fallback beta."},
}, []string{"beta"}, "content_with_weight")
assertEqual(t, gotInvalidFallback, map[string]string{
"invalid-weighted": "Invalid fallback <em>beta</em>",
})
}
func TestElasticsearchGetHighlightPreservesExistingAndNonEnglish(t *testing.T) {
engine := &elasticsearchEngine{}
gotExisting := engine.GetHighlight([]map[string]interface{}{
{"id": "existing", "content_with_weight": "already <em>marked</em> text"},
}, []string{"marked"}, "content_with_weight")
assertEqual(t, gotExisting, map[string]string{"existing": "already <em>marked</em> text"})
gotNonEnglish := engine.GetHighlight([]map[string]interface{}{
{"id": "cn", "content_with_weight": "这是世界。你好世界"},
}, []string{"世界"}, "content_with_weight")
assertEqual(t, gotNonEnglish, map[string]string{
"cn": "这是<em>世界</em>。你好<em>世界</em>",
})
gotOverlapping := engine.GetHighlight([]map[string]interface{}{
{"id": "overlap", "content_with_weight": "世界和世"},
}, []string{"世", "世界", "世界"}, "content_with_weight")
assertEqual(t, gotOverlapping, map[string]string{
"overlap": "<em>世界</em>和<em>世</em>",
})
if got := engine.GetHighlight([]map[string]interface{}{{"id": "x"}}, []string{"x"}, "content_with_weight"); got == nil || len(got) != 0 {
t.Fatalf("missing field highlight=%#v, want empty non-nil map", got)
}
if got := engine.GetHighlight([]map[string]interface{}{{"id": "x", "content": "x"}}, nil, "content"); got == nil || len(got) != 0 {
t.Fatalf("empty keyword highlight=%#v, want empty non-nil map", got)
}
}
func aggregationCounts(t *testing.T, aggregation []map[string]interface{}) map[string]int {
t.Helper()
counts := make(map[string]int, len(aggregation))
for _, item := range aggregation {
key, ok := item["key"].(string)
if !ok {
t.Fatalf("aggregation key type=%T in %#v", item["key"], item)
}
count, ok := item["count"].(int)
if !ok {
t.Fatalf("aggregation count type=%T in %#v", item["count"], item)
}
counts[key] = count
}
return counts
}
func assertEqual(t *testing.T, got, want interface{}) {
t.Helper()
if !reflect.DeepEqual(got, want) {
t.Fatalf("got %#v, want %#v", got, want)
}
}

View File

@@ -35,12 +35,13 @@ func makeResponse(n int, startID int, total int64) SearchResponse {
return resp
}
hits := make([]struct {
ID string `json:"_id"`
Index string `json:"_index"`
Score float64 `json:"_score"`
Source map[string]interface{} `json:"_source"`
Fields map[string]interface{} `json:"fields"`
Sort []interface{} `json:"sort,omitempty"`
ID string `json:"_id"`
Index string `json:"_index"`
Score float64 `json:"_score"`
Source map[string]interface{} `json:"_source"`
Fields map[string]interface{} `json:"fields"`
Highlight map[string]interface{} `json:"highlight,omitempty"`
Sort []interface{} `json:"sort,omitempty"`
}, n)
for i := 0; i < n; i++ {
id := startID + i