From 64b860f7711b4b6c6644ce02ac69b05ffc6d030d Mon Sep 17 00:00:00 2001
From: ghost <49853598+JSONbored@users.noreply.github.com>
Date: Tue, 9 Jun 2026 06:10:11 -0600
Subject: [PATCH] fix(elasticsearch): complete Go result functions (#15148)
## Summary
- Complete the Go Elasticsearch result functions that remained stubbed
after #15160.
- Add focused unit coverage for field mapping, aggregation, IDs, and
highlighting behavior.
- Update a stale query-builder test type import discovered during
validation.
## What changed
- Keep the Elasticsearch Go implementation merged in #15160 and fill in
`GetFields`, `GetAggregation`, `GetHighlight`, and `GetDocIDs` in
`internal/engine/elasticsearch/chunk.go`.
- Add regression and invariant coverage in
`internal/engine/elasticsearch/chunk_helpers_test.go`.
- Update `internal/service/nlp/query_builder_test.go` to use the current
`types.MatchTextExpr` type.
## Why
- #15160 implemented the main Go Elasticsearch surface, but
retrieval/tag flows still call result functions that returned stubs.
- Completing these functions keeps Elasticsearch result processing
aligned with the expected document-engine behavior for field extraction,
tag aggregation, doc ID extraction, and snippet highlighting.
## Validation
- `go test ./internal/engine/elasticsearch`
- `GOARCH=arm64 CGO_ENABLED=1 go test ./internal/service/nlp -run
TestQueryBuilder`
- `git diff --check`
- CodeRabbit review reported 0 issues after follow-up fixes.
- Codex Security diff scan found no reportable issues.
## Notes
- This PR is now a follow-up to #15160 rather than a competing
implementation.
- A full local `go test ./internal/service/nlp` run is blocked by local
WordNet resource prerequisites; the query-builder tests touched by this
PR pass with the arm64 CGO path.
---
internal/engine/elasticsearch/chunk.go | 213 ++++++++++++++---
.../elasticsearch/chunk_helpers_test.go | 218 ++++++++++++++++++
internal/engine/elasticsearch/chunk_test.go | 13 +-
3 files changed, 408 insertions(+), 36 deletions(-)
create mode 100644 internal/engine/elasticsearch/chunk_helpers_test.go
diff --git a/internal/engine/elasticsearch/chunk.go b/internal/engine/elasticsearch/chunk.go
index 75fab242f5..c0fc803392 100644
--- a/internal/engine/elasticsearch/chunk.go
+++ b/internal/engine/elasticsearch/chunk.go
@@ -18,12 +18,14 @@ package elasticsearch
import (
"bytes"
+ "cmp"
"context"
"encoding/json"
"fmt"
"io"
"os"
"reflect"
+ "regexp"
"slices"
"sort"
"strconv"
@@ -41,6 +43,14 @@ var jsonIterator = jsoniter.Config{
SortMapKeys: false,
}.Froze()
+var (
+ elasticsearchHighlightEmTagRE = regexp.MustCompile(`[^<>]+`)
+ elasticsearchHighlightNewlineRE = regexp.MustCompile(`[\r\n]`)
+ elasticsearchHighlightDelimiterRE = regexp.MustCompile(`[.?!;\n]`)
+ elasticsearchLetterRE = regexp.MustCompile(`\pL`)
+ elasticsearchEnglishLetterRE = regexp.MustCompile(`[A-Za-z]`)
+)
+
// CreateChunkStore creates an index
func (e *elasticsearchEngine) CreateChunkStore(ctx context.Context, baseName, datasetID string, vectorSize int, parserID string) error {
if baseName == "" {
@@ -712,11 +722,12 @@ type SearchResponse struct {
Value int64 `json:"value"`
} `json:"total"`
Hits []struct {
- ID string `json:"_id"`
- Index string `json:"_index"`
- Score float64 `json:"_score"`
- Source map[string]interface{} `json:"_source"`
- Fields map[string]interface{} `json:"fields"` // ES 9.x stores dense_vector here
+ ID string `json:"_id"`
+ Index string `json:"_index"`
+ Score float64 `json:"_score"`
+ Source map[string]interface{} `json:"_source"`
+ Fields map[string]interface{} `json:"fields"` // ES 9.x stores dense_vector here
+ Highlight map[string]interface{} `json:"highlight,omitempty"`
// Sort is populated when the request body specifies a `sort`
// clause. The last hit's Sort is the cursor for the next
// search_after request — without it, deep pagination can't
@@ -1581,10 +1592,13 @@ func (e *elasticsearchEngine) GetFields(chunks []map[string]interface{}, fields
}
for _, chunk := range chunks {
- docID, ok := chunk["_id"].(string)
+ docID, ok := elasticsearchChunkID(chunk)
if !ok {
continue
}
+ if id, ok := chunk["id"].(string); !ok || id == "" {
+ chunk["id"] = docID
+ }
m := make(map[string]interface{})
for field := range fieldSet {
@@ -1647,23 +1661,56 @@ func (e *elasticsearchEngine) GetAggregation(chunks []map[string]interface{}, fi
return []map[string]interface{}{}
}
- counts := make(map[string]int)
+ tagCounts := make(map[string]int)
for _, chunk := range chunks {
- if val, ok := chunk[fieldName]; ok && val != nil {
- key := fmt.Sprintf("%v", val)
- if key != "" {
- counts[key]++
+ value, ok := chunk[fieldName]
+ if !ok || value == nil {
+ continue
+ }
+
+ if valueStr, ok := value.(string); ok {
+ if valueStr == "" {
+ continue
+ }
+ separator := ","
+ if fieldName == "tag_kwd" && strings.Contains(valueStr, "###") {
+ separator = "###"
+ }
+ for _, tag := range strings.Split(valueStr, separator) {
+ countElasticsearchAggregationTag(tagCounts, tag)
+ }
+ continue
+ }
+
+ if valueList, ok := value.([]interface{}); ok {
+ for _, item := range valueList {
+ if itemStr, ok := item.(string); ok {
+ countElasticsearchAggregationTag(tagCounts, itemStr)
+ }
}
}
}
- result := make([]map[string]interface{}, 0, len(counts))
- for key, count := range counts {
- result = append(result, map[string]interface{}{
- "key": key,
- "count": count,
- })
+ if len(tagCounts) == 0 {
+ return []map[string]interface{}{}
}
+
+ tags := make([]string, 0, len(tagCounts))
+ for tag := range tagCounts {
+ tags = append(tags, tag)
+ }
+ slices.SortFunc(tags, func(a, b string) int {
+ if byCount := cmp.Compare(tagCounts[b], tagCounts[a]); byCount != 0 {
+ return byCount
+ }
+ return cmp.Compare(a, b)
+ })
+
+ result := make([]map[string]interface{}, len(tags))
+ for i, tag := range tags {
+ result[i] = map[string]interface{}{"key": tag, "count": tagCounts[tag]}
+ }
+
return result
}
@@ -1672,7 +1719,7 @@ func (e *elasticsearchEngine) GetAggregation(chunks []map[string]interface{}, fi
func (e *elasticsearchEngine) GetChunkIDs(chunks []map[string]interface{}) []string {
ids := make([]string, 0, len(chunks))
for _, chunk := range chunks {
- if id, ok := chunk["_id"].(string); ok {
+ if id, ok := elasticsearchChunkID(chunk); ok {
ids = append(ids, id)
}
}
@@ -1686,35 +1733,135 @@ func (e *elasticsearchEngine) GetHighlight(chunks []map[string]interface{}, keyw
return result
}
+ normalizedKeywords := normalizeElasticsearchHighlightKeywords(keywords)
+ englishPatterns := compileElasticsearchHighlightPatterns(normalizedKeywords)
+ nonEnglishPattern := compileElasticsearchNonEnglishHighlightPattern(normalizedKeywords)
+
for _, chunk := range chunks {
- docID, ok := chunk["_id"].(string)
+ docID, ok := elasticsearchChunkID(chunk)
if !ok {
continue
}
- highlight, ok := chunk["highlight"].(map[string]interface{})
- if !ok || len(highlight) == 0 {
+ if highlightText := firstElasticsearchHighlight(chunk); highlightText != "" {
+ result[docID] = highlightText
continue
}
- // Get first highlight entry
- var highlightText string
- for _, vals := range highlight {
- if arr, ok := vals.([]interface{}); ok && len(arr) > 0 {
- if str, ok := arr[0].(string); ok {
- highlightText = str
+ txt, ok := chunk[fieldName].(string)
+ if fieldName == "content_with_weight" && (!ok || txt == "") {
+ txt, ok = chunk["content"].(string)
+ }
+ if !ok || txt == "" {
+ continue
+ }
+
+ if elasticsearchHighlightEmTagRE.MatchString(txt) {
+ result[docID] = txt
+ continue
+ }
+
+ txt = elasticsearchHighlightNewlineRE.ReplaceAllString(txt, " ")
+ segments := elasticsearchHighlightDelimiterRE.Split(txt, -1)
+
+ var highlightedSegments []string
+ for _, segment := range segments {
+ segmentToCheck := segment
+ if isMostlyEnglishElasticsearchSegment(segment) {
+ for _, pattern := range englishPatterns {
+ segmentToCheck = pattern.ReplaceAllString(segmentToCheck, "$1$2$3")
}
- break
+ } else if nonEnglishPattern != nil {
+ segmentToCheck = nonEnglishPattern.ReplaceAllStringFunc(segmentToCheck, func(match string) string {
+ return "" + match + ""
+ })
+ }
+ if segmentToCheck != segment {
+ highlightedSegments = append(highlightedSegments, strings.TrimSpace(segmentToCheck))
}
}
- if highlightText != "" {
- result[docID] = highlightText
+ if len(highlightedSegments) > 0 {
+ result[docID] = strings.Join(highlightedSegments, "... ")
}
}
return result
}
+func elasticsearchChunkID(chunk map[string]interface{}) (string, bool) {
+ if id, ok := chunk["id"].(string); ok && id != "" {
+ return id, true
+ }
+ if id, ok := chunk["_id"].(string); ok && id != "" {
+ return id, true
+ }
+ return "", false
+}
+
+func firstElasticsearchHighlight(chunk map[string]interface{}) string {
+ highlight, ok := chunk["highlight"].(map[string]interface{})
+ if !ok || len(highlight) == 0 {
+ return ""
+ }
+
+ for _, vals := range highlight {
+ if arr, ok := vals.([]interface{}); ok && len(arr) > 0 {
+ if str, ok := arr[0].(string); ok {
+ return str
+ }
+ }
+ }
+ return ""
+}
+
+func countElasticsearchAggregationTag(counts map[string]int, tag string) {
+ if tag = strings.TrimSpace(tag); tag != "" {
+ counts[tag]++
+ }
+}
+
+func isMostlyEnglishElasticsearchSegment(segment string) bool {
+ totalCount := len(elasticsearchLetterRE.FindAllString(segment, -1))
+ return totalCount > 0 && float64(len(elasticsearchEnglishLetterRE.FindAllString(segment, -1)))/float64(totalCount) > 0.5
+}
+
+func compileElasticsearchHighlightPatterns(keywords []string) []*regexp.Regexp {
+ patterns := make([]*regexp.Regexp, 0, len(keywords))
+ for _, kw := range keywords {
+ patterns = append(patterns, regexp.MustCompile(`(?i)(^|[ .?/'\"\(\)!,:;-])(`+regexp.QuoteMeta(kw)+`)([ .?/'\"\(\)!,:;-]|$)`))
+ }
+ return patterns
+}
+
+func compileElasticsearchNonEnglishHighlightPattern(keywords []string) *regexp.Regexp {
+ if len(keywords) == 0 {
+ return nil
+ }
+ parts := make([]string, 0, len(keywords))
+ for _, kw := range keywords {
+ parts = append(parts, regexp.QuoteMeta(kw))
+ }
+ return regexp.MustCompile(strings.Join(parts, "|"))
+}
+
+func normalizeElasticsearchHighlightKeywords(keywords []string) []string {
+ seen := make(map[string]struct{}, len(keywords))
+ normalized := make([]string, 0, len(keywords))
+ for _, kw := range keywords {
+ if kw == "" {
+ continue
+ }
+ if _, ok := seen[kw]; !ok {
+ seen[kw] = struct{}{}
+ normalized = append(normalized, kw)
+ }
+ }
+ slices.SortStableFunc(normalized, func(a, b string) int {
+ return cmp.Compare(len(b), len(a))
+ })
+ return normalized
+}
+
// DropChunkStore deletes a chunk index
func (e *elasticsearchEngine) DropChunkStore(ctx context.Context, baseName, datasetID string) error {
return e.dropIndex(ctx, baseName)
@@ -2088,9 +2235,15 @@ func convertESResponse(esResp *SearchResponse, vectorFieldName string) []map[str
chunks := make([]map[string]interface{}, len(esResp.Hits.Hits))
for i, hit := range esResp.Hits.Hits {
chunks[i] = hit.Source
+ if chunks[i] == nil {
+ chunks[i] = make(map[string]interface{})
+ }
chunks[i]["_score"] = hit.Score
chunks[i]["_id"] = hit.ID
chunks[i]["_index"] = hit.Index
+ if len(hit.Highlight) > 0 {
+ chunks[i]["highlight"] = hit.Highlight
+ }
}
return chunks
}
diff --git a/internal/engine/elasticsearch/chunk_helpers_test.go b/internal/engine/elasticsearch/chunk_helpers_test.go
new file mode 100644
index 0000000000..d3d7d9de8a
--- /dev/null
+++ b/internal/engine/elasticsearch/chunk_helpers_test.go
@@ -0,0 +1,218 @@
+package elasticsearch
+
+import (
+ "reflect"
+ "strings"
+ "testing"
+)
+
+func TestElasticsearchGetFieldsFiltersAndUsesIDFallback(t *testing.T) {
+ engine := &elasticsearchEngine{}
+ chunks := []map[string]interface{}{
+ {
+ "_id": "fallback-chunk",
+ "docnm_kwd": []interface{}{"guide.md"},
+ "content_with_weight": "Alpha beta body.",
+ "available_int": float64(1),
+ "ignored": "not requested",
+ },
+ }
+
+ got := engine.GetFields(chunks, []string{"id", "docnm_kwd", "content_with_weight", "available_int"})
+ fieldMap, ok := got["fallback-chunk"]
+ if !ok {
+ t.Fatalf("GetFields keys=%v, want fallback-chunk", got)
+ }
+
+ assertEqual(t, fieldMap["id"], "fallback-chunk")
+ assertEqual(t, fieldMap["docnm_kwd"], "guide.md")
+ assertEqual(t, fieldMap["content_with_weight"], "Alpha beta body.")
+ assertEqual(t, fieldMap["available_int"], float64(1))
+
+ if _, ok := fieldMap["ignored"]; ok {
+ t.Fatalf("field filter leaked unrequested field: %#v", fieldMap)
+ }
+}
+
+func TestElasticsearchGetFieldsEmptyAndSkippedIDs(t *testing.T) {
+ engine := &elasticsearchEngine{}
+
+ if got := engine.GetFields(nil, nil); got == nil || len(got) != 0 {
+ t.Fatalf("GetFields(nil)=%#v, want empty non-nil map", got)
+ }
+
+ got := engine.GetFields([]map[string]interface{}{
+ {"id": "chunk-1", "docnm_kwd": "doc.md"},
+ {"id": "", "_id": "fallback-chunk", "docnm_kwd": "fallback.md"},
+ {"docnm": "missing-id.md"},
+ }, []string{"id", "docnm_kwd"})
+
+ fieldMap, ok := got["chunk-1"]
+ if !ok {
+ t.Fatalf("GetFields keys=%v, want chunk-1", got)
+ }
+ assertEqual(t, fieldMap["id"], "chunk-1")
+ assertEqual(t, fieldMap["docnm_kwd"], "doc.md")
+
+ if _, ok := got["missing-id.md"]; ok {
+ t.Fatalf("chunk without id should be skipped: %#v", got)
+ }
+ if fallbackMap, ok := got["fallback-chunk"]; !ok {
+ t.Fatalf("GetFields keys=%v, want fallback-chunk", got)
+ } else {
+ assertEqual(t, fallbackMap["id"], "fallback-chunk")
+ assertEqual(t, fallbackMap["docnm_kwd"], "fallback.md")
+ }
+}
+
+func TestElasticsearchGetAggregationSplitsCountsAndSorts(t *testing.T) {
+ engine := &elasticsearchEngine{}
+ chunks := []map[string]interface{}{
+ {"tag_kwd": "red###blue###"},
+ {"tag_kwd": []interface{}{"blue", " green ", ""}},
+ {"tag_kwd": "blue"},
+ {"tag_kwd": ""},
+ {},
+ }
+
+ got := engine.GetAggregation(chunks, "tag_kwd")
+ assertEqual(t, aggregationCounts(t, got), map[string]int{"blue": 3, "red": 1, "green": 1})
+ if len(got) == 0 || got[0]["key"] != "blue" || got[0]["count"] != 3 {
+ t.Fatalf("first aggregation=%#v, want blue count 3", got)
+ }
+ if len(got) != 3 || got[1]["key"] != "green" || got[2]["key"] != "red" {
+ t.Fatalf("tie ordering=%#v, want green before red", got)
+ }
+
+ docAgg := engine.GetAggregation([]map[string]interface{}{
+ {"docnm_kwd": "guide.md, api.md"},
+ {"docnm_kwd": "guide.md"},
+ }, "docnm_kwd")
+ assertEqual(t, aggregationCounts(t, docAgg), map[string]int{"guide.md": 2, "api.md": 1})
+
+ if got := engine.GetAggregation(chunks, "missing_kwd"); got == nil || len(got) != 0 {
+ t.Fatalf("missing aggregation=%#v, want empty non-nil slice", got)
+ }
+}
+
+func TestElasticsearchGetChunkIDsPreservesOrderWithFallback(t *testing.T) {
+ engine := &elasticsearchEngine{}
+ chunks := []map[string]interface{}{
+ {"id": "source-id", "_id": "hit-id"},
+ {"_id": "fallback-id"},
+ {"id": ""},
+ {"id": 42},
+ {"id": "last-id"},
+ }
+
+ got := engine.GetChunkIDs(chunks)
+ assertEqual(t, got, []string{"source-id", "fallback-id", "last-id"})
+
+ if got := engine.GetChunkIDs(nil); got == nil || len(got) != 0 {
+ t.Fatalf("GetChunkIDs(nil)=%#v, want empty non-nil slice", got)
+ }
+}
+
+func TestElasticsearchGetHighlightFallbackAndBoundaries(t *testing.T) {
+ engine := &elasticsearchEngine{}
+ chunks := []map[string]interface{}{
+ {
+ "_id": "fallback-id",
+ "content": "Alpha beta.\nbetamax soup. BETA again!",
+ },
+ }
+
+ got := engine.GetHighlight(chunks, []string{"beta"}, "content_with_weight")
+ assertEqual(t, got, map[string]string{
+ "fallback-id": "Alpha beta... BETA again",
+ })
+ if gotText := got["fallback-id"]; strings.Contains(gotText, "betamax") {
+ t.Fatalf("highlight matched inside a larger token: %q", gotText)
+ }
+
+ gotLaterFallback := engine.GetHighlight([]map[string]interface{}{
+ {"_id": "first"},
+ {"_id": "second", "content": "Gamma beta."},
+ }, []string{"beta"}, "content_with_weight")
+ assertEqual(t, gotLaterFallback, map[string]string{
+ "second": "Gamma beta",
+ })
+
+ gotMixedFallback := engine.GetHighlight([]map[string]interface{}{
+ {"id": "weighted", "content_with_weight": "Weighted beta."},
+ {"id": "plain", "content": "Plain beta."},
+ }, []string{"beta"}, "content_with_weight")
+ assertEqual(t, gotMixedFallback, map[string]string{
+ "plain": "Plain beta",
+ "weighted": "Weighted beta",
+ })
+
+ gotEmptyFallback := engine.GetHighlight([]map[string]interface{}{
+ {"id": "empty-weighted", "content_with_weight": "", "content": "Empty fallback beta."},
+ }, []string{"beta"}, "content_with_weight")
+ assertEqual(t, gotEmptyFallback, map[string]string{
+ "empty-weighted": "Empty fallback beta",
+ })
+
+ gotInvalidFallback := engine.GetHighlight([]map[string]interface{}{
+ {"id": "invalid-weighted", "content_with_weight": nil, "content": "Invalid fallback beta."},
+ }, []string{"beta"}, "content_with_weight")
+ assertEqual(t, gotInvalidFallback, map[string]string{
+ "invalid-weighted": "Invalid fallback beta",
+ })
+}
+
+func TestElasticsearchGetHighlightPreservesExistingAndNonEnglish(t *testing.T) {
+ engine := &elasticsearchEngine{}
+
+ gotExisting := engine.GetHighlight([]map[string]interface{}{
+ {"id": "existing", "content_with_weight": "already marked text"},
+ }, []string{"marked"}, "content_with_weight")
+ assertEqual(t, gotExisting, map[string]string{"existing": "already marked text"})
+
+ gotNonEnglish := engine.GetHighlight([]map[string]interface{}{
+ {"id": "cn", "content_with_weight": "这是世界。你好世界"},
+ }, []string{"世界"}, "content_with_weight")
+ assertEqual(t, gotNonEnglish, map[string]string{
+ "cn": "这是世界。你好世界",
+ })
+
+ gotOverlapping := engine.GetHighlight([]map[string]interface{}{
+ {"id": "overlap", "content_with_weight": "世界和世"},
+ }, []string{"世", "世界", "世界"}, "content_with_weight")
+ assertEqual(t, gotOverlapping, map[string]string{
+ "overlap": "世界和世",
+ })
+
+ if got := engine.GetHighlight([]map[string]interface{}{{"id": "x"}}, []string{"x"}, "content_with_weight"); got == nil || len(got) != 0 {
+ t.Fatalf("missing field highlight=%#v, want empty non-nil map", got)
+ }
+ if got := engine.GetHighlight([]map[string]interface{}{{"id": "x", "content": "x"}}, nil, "content"); got == nil || len(got) != 0 {
+ t.Fatalf("empty keyword highlight=%#v, want empty non-nil map", got)
+ }
+}
+
+func aggregationCounts(t *testing.T, aggregation []map[string]interface{}) map[string]int {
+ t.Helper()
+
+ counts := make(map[string]int, len(aggregation))
+ for _, item := range aggregation {
+ key, ok := item["key"].(string)
+ if !ok {
+ t.Fatalf("aggregation key type=%T in %#v", item["key"], item)
+ }
+ count, ok := item["count"].(int)
+ if !ok {
+ t.Fatalf("aggregation count type=%T in %#v", item["count"], item)
+ }
+ counts[key] = count
+ }
+ return counts
+}
+
+func assertEqual(t *testing.T, got, want interface{}) {
+ t.Helper()
+ if !reflect.DeepEqual(got, want) {
+ t.Fatalf("got %#v, want %#v", got, want)
+ }
+}
diff --git a/internal/engine/elasticsearch/chunk_test.go b/internal/engine/elasticsearch/chunk_test.go
index 751ec837eb..34313d2895 100644
--- a/internal/engine/elasticsearch/chunk_test.go
+++ b/internal/engine/elasticsearch/chunk_test.go
@@ -35,12 +35,13 @@ func makeResponse(n int, startID int, total int64) SearchResponse {
return resp
}
hits := make([]struct {
- ID string `json:"_id"`
- Index string `json:"_index"`
- Score float64 `json:"_score"`
- Source map[string]interface{} `json:"_source"`
- Fields map[string]interface{} `json:"fields"`
- Sort []interface{} `json:"sort,omitempty"`
+ ID string `json:"_id"`
+ Index string `json:"_index"`
+ Score float64 `json:"_score"`
+ Source map[string]interface{} `json:"_source"`
+ Fields map[string]interface{} `json:"fields"`
+ Highlight map[string]interface{} `json:"highlight,omitempty"`
+ Sort []interface{} `json:"sort,omitempty"`
}, n)
for i := 0; i < n; i++ {
id := startID + i