Files

2858 lines
82 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package elasticsearch
import (
"bytes"
"cmp"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"reflect"
"regexp"
"slices"
"sort"
"strconv"
"strings"
"ragflow/internal/common"
"ragflow/internal/engine/types"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/json-iterator/go"
"go.uber.org/zap"
)
var jsonIterator = jsoniter.Config{
SortMapKeys: false,
}.Froze()
var memoryMessageVectorFieldRE = regexp.MustCompile(`^q_\d+_vec$`)
var (
elasticsearchHighlightEmTagRE = regexp.MustCompile(`<em>[^<>]+</em>`)
elasticsearchHighlightNewlineRE = regexp.MustCompile(`[\r\n]`)
elasticsearchHighlightDelimiterRE = regexp.MustCompile(`[.?!;\n]`)
elasticsearchLetterRE = regexp.MustCompile(`\pL`)
elasticsearchEnglishLetterRE = regexp.MustCompile(`[A-Za-z]`)
)
// CreateChunkStore creates an index
func (e *elasticsearchEngine) CreateChunkStore(ctx context.Context, baseName, datasetID string, vectorSize int, parserID string) error {
if baseName == "" {
return fmt.Errorf("index name cannot be empty")
}
// Check if index already exists
exists, err := e.indexExists(ctx, baseName)
if err != nil {
return fmt.Errorf("failed to check index existence: %w", err)
}
if exists {
if strings.HasPrefix(baseName, "memory_") {
if err := e.ensureMemoryMessageVectorMapping(ctx, baseName, vectorSize); err != nil {
return fmt.Errorf("failed to ensure memory vector mapping: %w", err)
}
common.Info("Memory index already exists, ensured vector mapping", zap.String("index_name", baseName), zap.Int("vector_size", vectorSize))
return nil
}
common.Info("Index already exists, skipping creation", zap.String("index_name", baseName))
return nil
}
// Load mapping based on index type
var mapping map[string]interface{}
if strings.HasPrefix(baseName, "memory_") {
mapping = getMemoryMessageMapping(vectorSize)
} else if datasetID == "skill" {
// Load skill-specific mapping
skillMapping, err := loadSkillMapping()
if err != nil {
return fmt.Errorf("failed to load skill mapping: %w", err)
}
mapping = skillMapping
} else {
// Default mapping for dataset
mapping = map[string]interface{}{
"settings": map[string]interface{}{
"number_of_shards": 1,
"number_of_replicas": 0,
},
}
}
// Prepare request body
var body io.Reader
if mapping != nil {
data, err := json.Marshal(mapping)
if err != nil {
return fmt.Errorf("failed to marshal mapping: %w", err)
}
body = bytes.NewReader(data)
}
// Create index
req := esapi.IndicesCreateRequest{
Index: baseName,
Body: body,
}
res, err := req.Do(ctx, e.client)
if err != nil {
return fmt.Errorf("failed to create index: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
reason := extractErrorReason(bodyBytes)
if reason != "" {
return fmt.Errorf("elasticsearch error: %s", reason)
}
return fmt.Errorf("elasticsearch returned error: %s, body: %s", res.Status(), string(bodyBytes))
}
// Parse response
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return fmt.Errorf("failed to parse response: %w", err)
}
acknowledged, ok := result["acknowledged"].(bool)
if !ok || !acknowledged {
return fmt.Errorf("index creation not acknowledged")
}
common.Info("Successfully created Elasticsearch index", zap.String("index_name", baseName))
return nil
}
// InsertChunks inserts chunks into a chunk index
// If a chunk with the same id + doc_id + kb_id already exists, it will be updated with the new value
func (e *elasticsearchEngine) InsertChunks(ctx context.Context, chunks []map[string]interface{}, baseName string, datasetID string) ([]string, error) {
common.Info("ElasticsearchConnection.InsertChunks called", zap.String("index_name", baseName), zap.Int("chunkCount", len(chunks)))
if len(chunks) == 0 {
return []string{}, nil
}
if baseName == "" {
return nil, fmt.Errorf("index name cannot be empty")
}
if strings.HasPrefix(baseName, "memory_") {
if err := e.ensureMemoryMessageVectorMappingsForDocs(ctx, baseName, chunks); err != nil {
return nil, err
}
}
// Build bulk request body with index operations (upsert behavior: insert if not exists, update if exists)
var buf bytes.Buffer
for _, doc := range chunks {
docID, _ := doc["doc_id"].(string)
chunkID, _ := doc["id"].(string)
if docID == "" || chunkID == "" {
common.Warn("Skipping chunk without doc_id or id")
continue
}
// Action line: use json.Marshal to properly escape string values
action, err := json.Marshal(map[string]interface{}{
"index": map[string]interface{}{
"_index": baseName,
"_id": chunkID,
},
})
if err != nil {
common.Error("Failed to marshal bulk action", err)
return nil, fmt.Errorf("failed to marshal bulk action: %w", err)
}
buf.Write(action)
buf.WriteByte('\n')
// Document line: work with a copy to avoid mutating the original
docCopy := copyFields(doc)
docCopy["kb_id"] = datasetID
if err := jsonIterator.NewEncoder(&buf).Encode(docCopy); err != nil {
return nil, fmt.Errorf("failed to encode document: %w", err)
}
}
// Execute bulk request with refresh="wait_for"
req := esapi.BulkRequest{
Body: bytes.NewReader(buf.Bytes()),
Refresh: "wait_for",
}
res, err := req.Do(ctx, e.client)
if err != nil {
common.Error("Failed to execute bulk request", err)
return nil, fmt.Errorf("failed to execute bulk request: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
common.Sugar.Errorw("Elasticsearch bulk request returned error", "status", res.Status(), "body", string(bodyBytes))
return nil, fmt.Errorf("elasticsearch bulk request returned error: %s, body: %s", res.Status(), string(bodyBytes))
}
// Parse bulk response
var bulkResponse map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&bulkResponse); err != nil {
common.Error("Failed to parse bulk response", err)
return nil, fmt.Errorf("failed to parse bulk response: %w", err)
}
// Check for errors in bulk response
if errors, ok := bulkResponse["errors"].(bool); ok && errors {
common.Warn("Bulk request had some errors")
// Could iterate through items to find specific errors if needed
}
common.Info("ElasticsearchConnection.InsertChunks result", zap.String("index_name", baseName), zap.Int("count", len(chunks)))
return []string{}, nil
}
// UpdateChunks updates chunks by condition
func (e *elasticsearchEngine) UpdateChunks(ctx context.Context, condition map[string]interface{}, newValue map[string]interface{}, baseName string, datasetID string) error {
fullIndexName := baseName
common.Info("ElasticsearchConnection.UpdateChunks called", zap.String("index_name", fullIndexName), zap.Any("condition", condition), zap.Any("new_value", newValue))
if fullIndexName == "" {
return fmt.Errorf("index name cannot be empty")
}
// Check if index exists
exists, err := e.indexExists(ctx, fullIndexName)
if err != nil {
common.Error("Failed to check index existence", err)
return fmt.Errorf("failed to check index existence: %w", err)
}
if !exists {
return fmt.Errorf("index '%s' does not exist", fullIndexName)
}
if strings.HasPrefix(fullIndexName, "memory_") {
condition["memory_id"] = datasetID
if messageDocID, ok := condition["id"].(string); ok {
return e.updateSingleMemoryMessage(ctx, fullIndexName, messageDocID, newValue)
}
return e.updateChunksByQuery(ctx, fullIndexName, mapMemoryMessageESConditionFields(condition), mapMemoryMessageESUpdateFields(newValue))
}
// Add kb_id to condition
condition["kb_id"] = datasetID
// Case 1: Single document update (when condition["id"] is a string)
if chunkID, ok := condition["id"].(string); ok {
return e.updateSingleChunk(ctx, fullIndexName, chunkID, newValue)
}
// Case 2: Multi-document update via UpdateByQuery
return e.updateChunksByQuery(ctx, fullIndexName, condition, newValue)
}
func (e *elasticsearchEngine) updateSingleMemoryMessage(ctx context.Context, indexName, messageDocID string, newValue map[string]interface{}) error {
doc := mapMemoryMessageESUpdateFields(newValue)
delete(doc, "id")
if len(doc) == 0 {
return nil
}
updateBody := map[string]interface{}{"doc": doc}
body, err := json.Marshal(updateBody)
if err != nil {
return fmt.Errorf("failed to marshal memory message update request: %w", err)
}
req := esapi.UpdateRequest{
Index: indexName,
DocumentID: messageDocID,
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, e.client)
if err != nil {
return fmt.Errorf("failed to update memory message: %w", err)
}
defer res.Body.Close()
if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return fmt.Errorf("%w: %s", types.ErrDocumentNotFound, messageDocID)
}
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("elasticsearch memory message update error: %s, body: %s", res.Status(), string(bodyBytes))
}
return nil
}
func mapMemoryMessageESUpdateFields(newValue map[string]interface{}) map[string]interface{} {
doc := make(map[string]interface{}, len(newValue))
for k, v := range newValue {
switch k {
case "remove", "add":
doc[k] = v
default:
doc[mapMemoryMessageESField(k, false)] = v
}
}
return doc
}
func mapMemoryMessageESConditionFields(condition map[string]interface{}) map[string]interface{} {
mapped := make(map[string]interface{}, len(condition))
for k, v := range condition {
mapped[mapMemoryMessageESField(k, false)] = v
}
return mapped
}
// updateSingleChunk handles single document update
func (e *elasticsearchEngine) updateSingleChunk(ctx context.Context, indexName, chunkID string, newValue map[string]interface{}) error {
common.Debug("ElasticsearchConnection.updateSingleChunk called", zap.String("indexName", indexName), zap.String("chunkID", chunkID))
// First find the document by id field to get the actual _id
searchReq := map[string]interface{}{
"query": map[string]interface{}{
"term": map[string]interface{}{"id": chunkID},
},
}
body, err := json.Marshal(searchReq)
if err != nil {
return fmt.Errorf("failed to marshal search request: %w", err)
}
res, err := e.client.Search(
e.client.Search.WithContext(ctx),
e.client.Search.WithIndex(indexName),
e.client.Search.WithBody(bytes.NewReader(body)),
)
if err != nil {
return fmt.Errorf("failed to search for chunk: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("failed to search for chunk: %s", res.Status())
}
var searchResult map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil {
return fmt.Errorf("failed to parse search response: %w", err)
}
hits, ok := searchResult["hits"].(map[string]interface{})
if !ok {
return fmt.Errorf("%w: %s", types.ErrDocumentNotFound, chunkID)
}
hitList, ok := hits["hits"].([]interface{})
if !ok || len(hitList) == 0 {
return fmt.Errorf("%w: %s", types.ErrDocumentNotFound, chunkID)
}
firstHit, ok := hitList[0].(map[string]interface{})
if !ok {
return fmt.Errorf("%w: %s", types.ErrDocumentNotFound, chunkID)
}
actualID, ok := firstHit["_id"].(string)
if !ok {
return fmt.Errorf("%w: %s", types.ErrDocumentNotFound, chunkID)
}
doc := copyFields(newValue)
delete(doc, "id")
removeValue, _ := doc["remove"]
delete(doc, "remove")
removeField, _ := removeValue.(string)
removeDict, _ := removeValue.(map[string]interface{})
// Remove *_feas fields
var feasFields []string
for k := range doc {
if strings.HasSuffix(k, "feas") {
feasFields = append(feasFields, k)
}
}
for _, k := range feasFields {
scriptBody := map[string]interface{}{
"script": map[string]interface{}{
"source": fmt.Sprintf("ctx._source.remove(\"%s\");", k),
},
}
body, _ := json.Marshal(scriptBody)
req := esapi.UpdateRequest{
Index: indexName,
DocumentID: actualID,
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, e.client)
if err != nil {
common.Warn("Failed to remove feas field", zap.String("field", k), zap.Error(err))
} else {
res.Body.Close()
}
}
// Remove specific field if removeField is set
if removeField != "" {
scriptBody := map[string]interface{}{
"script": map[string]interface{}{
"source": fmt.Sprintf("ctx._source.remove('%s');", removeField),
},
}
body, _ := json.Marshal(scriptBody)
req := esapi.UpdateRequest{
Index: indexName,
DocumentID: actualID,
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, e.client)
if err != nil {
common.Warn("Failed to remove field", zap.String("field", removeField), zap.Error(err))
} else {
res.Body.Close()
}
}
// Remove specific values from array fields (removeDict)
if removeDict != nil {
scripts := []string{}
params := make(map[string]interface{})
for kk, vv := range removeDict {
scripts = append(scripts,
fmt.Sprintf("if (ctx._source.containsKey('%s') && ctx._source.%s != null) { int i = ctx._source.%s.indexOf(params.p_%s); if (i >= 0) { ctx._source.%s.remove(i); }}",
kk, kk, kk, kk, kk))
params[fmt.Sprintf("p_%s", kk)] = vv
}
if scripts != nil {
scriptBody := map[string]interface{}{
"script": map[string]interface{}{
"source": strings.Join(scripts, ""),
"params": params,
},
}
body, _ := json.Marshal(scriptBody)
req := esapi.UpdateRequest{
Index: indexName,
DocumentID: actualID,
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, e.client)
if err != nil {
common.Warn("Failed to remove dict fields", zap.Error(err))
} else {
res.Body.Close()
}
}
}
// Update document fields if any remain
if len(doc) > 0 {
updateBody := map[string]interface{}{"doc": doc}
body, _ := json.Marshal(updateBody)
req := esapi.UpdateRequest{
Index: indexName,
DocumentID: actualID,
Body: bytes.NewReader(body),
}
res, err := req.Do(ctx, e.client)
if err != nil {
return fmt.Errorf("failed to update document: %w", err)
}
defer res.Body.Close()
if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return fmt.Errorf("%w: %s", types.ErrDocumentNotFound, chunkID)
}
return fmt.Errorf("elasticsearch update error: %s", res.Status())
}
}
common.Debug("ElasticsearchConnection.updateSingleChunk completed", zap.String("indexName", indexName), zap.String("chunkID", chunkID))
return nil
}
// updateChunksByQuery handles multi-document update
func (e *elasticsearchEngine) updateChunksByQuery(ctx context.Context, indexName string, condition map[string]interface{}, newValue map[string]interface{}) error {
common.Debug("ElasticsearchConnection.updateChunksByQuery called", zap.String("indexName", indexName))
// Build bool query from condition
var mustClauses []map[string]interface{}
for k, v := range condition {
if k == "exists" {
mustClauses = append(mustClauses, map[string]interface{}{
"exists": map[string]interface{}{"field": v},
})
continue
}
if v == nil || v == "" {
continue
}
if listVal, ok := v.([]interface{}); ok {
mustClauses = append(mustClauses, map[string]interface{}{
"terms": map[string]interface{}{k: listVal},
})
} else if _, ok := v.(string); ok {
mustClauses = append(mustClauses, map[string]interface{}{
"term": map[string]interface{}{k: v},
})
} else if _, ok := v.(int); ok {
mustClauses = append(mustClauses, map[string]interface{}{
"term": map[string]interface{}{k: v},
})
}
}
boolQuery := map[string]interface{}{
"bool": map[string]interface{}{
"filter": mustClauses,
},
}
// Build painless scripts from newValue
var scripts []string
params := make(map[string]interface{})
for k, v := range newValue {
if k == "remove" {
if removeStr, ok := v.(string); ok {
scripts = append(scripts, fmt.Sprintf("ctx._source.remove('%s');", removeStr))
continue
}
if removeDict, ok := v.(map[string]interface{}); ok {
for kk, vv := range removeDict {
scripts = append(scripts,
fmt.Sprintf("if (ctx._source.containsKey('%s') && ctx._source.%s != null) { int i = ctx._source.%s.indexOf(params.p_%s); if (i >= 0) { ctx._source.%s.remove(i); }}",
kk, kk, kk, kk, kk))
params[fmt.Sprintf("p_%s", kk)] = vv
}
}
continue
}
if k == "add" {
if addDict, ok := v.(map[string]interface{}); ok {
for kk, vv := range addDict {
vvStr, ok := vv.(string)
if ok {
vvStr = strings.TrimSpace(vvStr)
scripts = append(scripts, fmt.Sprintf("ctx._source.%s.add(params.pp_%s);", kk, kk))
params[fmt.Sprintf("pp_%s", kk)] = vvStr
}
}
}
continue
}
if (k == "" || v == nil) && k != "available_int" {
continue
}
switch val := v.(type) {
case string:
// Sanitize: replace ' \n \r with space
sanitized := sanitizeString(val)
params[fmt.Sprintf("pp_%s", k)] = sanitized
scripts = append(scripts, fmt.Sprintf("ctx._source.%s=params.pp_%s;", k, k))
case int, float64:
scripts = append(scripts, fmt.Sprintf("ctx._source.%s=%v;", k, val))
case []interface{}:
params[fmt.Sprintf("pp_%s", k)] = val
scripts = append(scripts, fmt.Sprintf("ctx._source.%s=params.pp_%s;", k, k))
}
}
scriptSource := strings.Join(scripts, "")
// Build update by query body
updateBody := map[string]interface{}{
"query": boolQuery,
"script": map[string]interface{}{
"source": scriptSource,
"params": params,
},
}
bodyBytes, err := json.Marshal(updateBody)
if err != nil {
return fmt.Errorf("failed to marshal update body: %w", err)
}
// Execute update by query with refresh=true, slices=5, conflicts="proceed"
refreshTrue := true
req := esapi.UpdateByQueryRequest{
Index: []string{indexName},
Body: bytes.NewReader(bodyBytes),
Refresh: &refreshTrue,
Slices: 5,
Conflicts: "proceed",
}
res, err := req.Do(ctx, e.client)
if err != nil {
common.Error("Failed to execute update by query", err)
return fmt.Errorf("failed to execute update by query: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("elasticsearch update by query error: %s, body: %s", res.Status(), string(bodyBytes))
}
common.Debug("ElasticsearchConnection.updateChunksByQuery completed", zap.String("indexName", indexName))
return nil
}
// sanitizeString replaces ' \n \r with space
func sanitizeString(s string) string {
s = strings.ReplaceAll(s, "'", " ")
s = strings.ReplaceAll(s, "\n", " ")
s = strings.ReplaceAll(s, "\r", " ")
return strings.TrimSpace(s)
}
// copyFields creates a shallow copy of a map
func copyFields(m map[string]interface{}) map[string]interface{} {
result := make(map[string]interface{})
for k, v := range m {
result[k] = v
}
return result
}
// DeleteChunks deletes chunks from a dataset index by condition
func (e *elasticsearchEngine) DeleteChunks(ctx context.Context, condition map[string]interface{}, indexName string, datasetID string) (int64, error) {
// For ES, index name is just indexName (e.g., "ragflow_{tenantID}"), not indexName_datasetID
fullIndexName := indexName
common.Info("Deleting chunks from Elasticsearch index", zap.String("index_name", fullIndexName), zap.Any("condition", condition))
// Check if index exists
exists, err := e.indexExists(ctx, fullIndexName)
if err != nil {
return 0, fmt.Errorf("failed to check index existence: %w", err)
}
if !exists {
common.Warn(fmt.Sprintf("Index %s does not exist, skipping delete", fullIndexName))
return 0, nil
}
// Build bool query from condition
var mustClauses []map[string]interface{}
var filterClauses []map[string]interface{}
var mustNotClauses []map[string]interface{}
// Handle chunk IDs - use terms query on "id" field instead of ids query on _id
if idVal, ok := condition["id"]; ok && idVal != nil {
switch v := idVal.(type) {
case []interface{}:
ids := make([]string, 0, len(v))
for _, id := range v {
if s, ok := id.(string); ok {
ids = append(ids, s)
}
}
if len(ids) > 0 {
mustClauses = append(mustClauses, map[string]interface{}{
"terms": map[string]interface{}{"id": ids},
})
}
case string:
mustClauses = append(mustClauses, map[string]interface{}{
"term": map[string]interface{}{"id": v},
})
}
}
// Handle kb_id - add as term filter
if kbID, ok := condition["kb_id"].(string); ok && kbID != "" {
filterClauses = append(filterClauses, map[string]interface{}{
"term": map[string]interface{}{"kb_id": kbID},
})
}
// Add all other conditions as filters/must/must_not
for k, v := range condition {
if k == "id" || k == "kb_id" {
continue // Already handled above
}
if k == "exists" {
filterClauses = append(filterClauses, map[string]interface{}{
"exists": map[string]interface{}{"field": v},
})
} else if k == "must_not" {
if m, ok := v.(map[string]interface{}); ok {
for kk, vv := range m {
if kk == "exists" {
mustNotClauses = append(mustNotClauses, map[string]interface{}{
"exists": map[string]interface{}{"field": vv},
})
}
}
}
} else if v != nil {
if listVal, ok := v.([]interface{}); ok {
mustClauses = append(mustClauses, map[string]interface{}{
"terms": map[string]interface{}{k: listVal},
})
} else if _, ok := v.(string); ok {
mustClauses = append(mustClauses, map[string]interface{}{
"term": map[string]interface{}{k: v},
})
} else if _, ok := v.(int); ok {
mustClauses = append(mustClauses, map[string]interface{}{
"term": map[string]interface{}{k: v},
})
}
}
}
// Build the query
var qry map[string]interface{}
if len(filterClauses) == 0 && len(mustClauses) == 0 && len(mustNotClauses) == 0 {
qry = map[string]interface{}{"match_all": map[string]interface{}{}}
} else {
boolMap := map[string]interface{}{}
if len(filterClauses) > 0 {
boolMap["filter"] = filterClauses
}
if len(mustClauses) > 0 {
boolMap["must"] = mustClauses
}
if len(mustNotClauses) > 0 {
boolMap["must_not"] = mustNotClauses
}
qry = map[string]interface{}{"bool": boolMap}
}
// Build delete by query body
deleteBody := map[string]interface{}{
"query": qry,
}
bodyBytes, err := json.Marshal(deleteBody)
if err != nil {
return 0, fmt.Errorf("failed to marshal delete body: %w", err)
}
// Execute delete by query with refresh=true
refreshTrue := true
req := esapi.DeleteByQueryRequest{
Index: []string{fullIndexName},
Body: bytes.NewReader(bodyBytes),
Refresh: &refreshTrue,
}
res, err := req.Do(ctx, e.client)
if err != nil {
common.Error("Failed to execute delete by query", err)
if strings.Contains(err.Error(), "not_found") {
return 0, nil
}
return 0, fmt.Errorf("failed to execute delete by query: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
errStr := string(bodyBytes)
if strings.Contains(errStr, "not_found") {
return 0, nil
}
common.Sugar.Errorw("Elasticsearch delete by query returned error", "status", res.Status())
return 0, fmt.Errorf("elasticsearch delete by query returned error: %s", res.Status())
}
// Parse response
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
common.Error("Failed to parse delete response", err)
return 0, fmt.Errorf("failed to parse delete response: %w", err)
}
deleted := int64(0)
if d, ok := result["deleted"].(float64); ok {
deleted = int64(d)
}
common.Info("Successfully deleted chunks", zap.String("index_name", fullIndexName), zap.Int64("deleted_count", deleted))
return deleted, nil
}
// SearchResponse Elasticsearch search response
type SearchResponse struct {
Hits struct {
Total struct {
Value int64 `json:"value"`
} `json:"total"`
Hits []struct {
ID string `json:"_id"`
Index string `json:"_index"`
Score float64 `json:"_score"`
Source map[string]interface{} `json:"_source"`
Fields map[string]interface{} `json:"fields"` // ES 9.x stores dense_vector here
Highlight map[string]interface{} `json:"highlight,omitempty"`
// Sort is populated when the request body specifies a `sort`
// clause. The last hit's Sort is the cursor for the next
// search_after request — without it, deep pagination can't
// advance.
Sort []interface{} `json:"sort,omitempty"`
} `json:"hits"`
} `json:"hits"`
Aggregations map[string]interface{} `json:"aggregations"`
}
// Search executes search with unified types.SearchRequest
func (e *elasticsearchEngine) Search(ctx context.Context, req *types.SearchRequest) (*types.SearchResult, error) {
types.LogSearchRequest("Elasticsearch", req)
// Validate inputs and set defaults
if len(req.IndexNames) == 0 {
return nil, fmt.Errorf("index names cannot be empty")
}
offset := req.Offset
if offset < 0 {
offset = 0
}
limit := req.Limit
if limit <= 0 {
limit = 30
}
// Detect index types
isSkillIndex := false
isMemoryIndex := false
for _, idx := range req.IndexNames {
if strings.HasPrefix(idx, "skill_") {
isSkillIndex = true
}
if strings.HasPrefix(idx, "memory_") {
isMemoryIndex = true
}
}
// Build bool query from condition
boolQuery := buildBoolQueryFromCondition(req.Filter, req.KbIDs, isSkillIndex, isMemoryIndex)
// Extract vector_similarity_weight from FusionExpr
var matchText *types.MatchTextExpr
var matchDense *types.MatchDenseExpr
vectorSimilarityWeight := 0.5
for _, expr := range req.MatchExprs {
if expr == nil {
continue
}
switch m := expr.(type) {
case *types.FusionExpr:
if m.Method == "weighted_sum" {
if weights, ok := m.FusionParams["weights"].(string); ok {
// Assert structure only when FusionExpr has weighted_sum with weights
if len(req.MatchExprs) != 3 {
return nil, fmt.Errorf("match_expressions must have exactly 3 elements with FusionExpr, got %d", len(req.MatchExprs))
}
if _, ok := req.MatchExprs[0].(*types.MatchTextExpr); !ok {
return nil, fmt.Errorf("match_expressions[0] must be MatchTextExpr")
}
if _, ok := req.MatchExprs[1].(*types.MatchDenseExpr); !ok {
return nil, fmt.Errorf("match_expressions[1] must be MatchDenseExpr")
}
if _, ok := req.MatchExprs[2].(*types.FusionExpr); !ok {
return nil, fmt.Errorf("match_expressions[2] must be FusionExpr")
}
parts := strings.Split(weights, ",")
if len(parts) == 2 {
if w, err := strconv.ParseFloat(parts[1], 64); err == nil {
vectorSimilarityWeight = w
}
}
}
}
case *types.MatchTextExpr:
matchText = m
case *types.MatchDenseExpr:
matchDense = m
}
}
// Build query body with text match and/or knn match
queryBody := make(map[string]interface{})
if matchText != nil {
textQuery := buildQueryStringQuery(matchText, vectorSimilarityWeight, isSkillIndex, isMemoryIndex)
if boolQuery != nil {
if boolMap, ok := boolQuery["bool"].(map[string]interface{}); ok {
if must, ok := boolMap["must"].([]interface{}); ok {
must = append(must, textQuery)
boolMap["must"] = must
} else {
boolMap["must"] = []interface{}{textQuery}
}
boolMap["boost"] = 1.0 - vectorSimilarityWeight
}
} else {
boolQuery = textQuery
}
}
hasVectorMatch := matchDense != nil && len(matchDense.EmbeddingData) > 0
if hasVectorMatch {
if isMemoryIndex {
if err := e.ensureMemoryMessageSearchVectorMappings(ctx, req.IndexNames, matchDense.VectorColumnName, len(matchDense.EmbeddingData)); err != nil {
return nil, err
}
}
k := matchDense.TopN
if k <= 0 {
k = limit
}
if k <= 0 {
k = 1024
}
numCandidates := k * 2
similarity := 0.0
if matchDense.ExtraOptions != nil {
if sim, ok := matchDense.ExtraOptions["similarity"].(float64); ok {
similarity = sim
}
}
vectorFieldName := matchDense.VectorColumnName
knnQuery := map[string]interface{}{
"field": vectorFieldName,
"query_vector": matchDense.EmbeddingData,
"k": k,
"num_candidates": numCandidates,
"similarity": similarity,
"filter": boolQuery,
}
queryBody["knn"] = knnQuery
if boolQuery != nil {
queryBody["query"] = boolQuery
}
} else if boolQuery != nil {
queryBody["query"] = boolQuery
} else {
queryBody["query"] = map[string]interface{}{
"match_all": map[string]interface{}{},
}
}
// Add rank_feature queries
if req.RankFeature != nil && len(req.RankFeature) > 0 && !isSkillIndex && !isMemoryIndex {
rankFeatureQuery := buildRankFeatureQuery(req.RankFeature)
if rankFeatureQuery != nil {
if boolQuery, ok := queryBody["query"].(map[string]interface{}); ok {
if boolMap, ok := boolQuery["bool"].(map[string]interface{}); ok {
if should, ok := boolMap["should"].([]interface{}); ok {
for _, q := range rankFeatureQuery {
boolMap["should"] = append(should, q)
}
} else {
interfaceSlice := make([]interface{}, len(rankFeatureQuery))
for i, q := range rankFeatureQuery {
interfaceSlice[i] = q
}
boolMap["should"] = interfaceSlice
}
}
}
}
}
// Add sorting if order_by specified
if req.OrderBy != nil && len(req.OrderBy.Fields) > 0 {
sort := parseOrderByExpr(req.OrderBy)
if len(sort) > 0 {
queryBody["sort"] = sort
}
}
// Determine use_search_after for deep pagination
//
// ES rejects from + size combinations where from + size > index.max_result_window
// (default 10,000) — see https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html.
// For those requests we must drop `from` and walk the result set with
// `search_after` instead. The preconditions mirror the Python reference
// (rag/utils/es_conn.py):
// - explicit OrderBy is required (search_after needs a stable cursor,
// and _score / KNN-similarity sorts are not unique enough to be safe)
// - no dense/KNN match (knn queries do not honour `search_after` in the
// same way and the Python path explicitly disallows them here)
hasDense := hasVectorMatch
hasExplicitSort := req.OrderBy != nil && len(req.OrderBy.Fields) > 0
useSearchAfter := limit > 0 && (offset+limit > common.MAX_RESULT_WINDOW) && hasExplicitSort && !hasDense
// Apply offset/limit pagination. When useSearchAfter is true, the
// caller is going to drive pagination via searchAfterCursor()
// instead, so we must NOT emit from/size here — leaving them out
// is the whole point of routing to the search_after path.
if !useSearchAfter && limit > 0 {
queryBody["size"] = limit
queryBody["from"] = offset
}
// Set _source and fields for vector fields
hasTextMatch := matchText != nil
selectFields := req.SelectFields
if isMemoryIndex {
selectFields = mapMemoryMessageESFields(req.SelectFields, false)
}
if len(req.SelectFields) > 0 {
// Use caller-specified fields, add pagerank_fld/tag_fld if needed
queryBody["_source"] = selectFields
if hasTextMatch || hasVectorMatch {
if !isSkillIndex && !isMemoryIndex {
if !slices.Contains(selectFields, common.PAGERANK_FLD) {
queryBody["_source"] = append(queryBody["_source"].([]string), common.PAGERANK_FLD)
}
if !slices.Contains(selectFields, common.TAG_FLD) {
queryBody["_source"] = append(queryBody["_source"].([]string), common.TAG_FLD)
}
}
}
var vectorFields []string
for _, f := range selectFields {
if strings.HasSuffix(f, "_vec") {
vectorFields = append(vectorFields, f)
}
}
if len(vectorFields) > 0 {
queryBody["fields"] = vectorFields
}
} else {
// No explicit SelectFields - use match_all, but add pagerank_fld/tag_fld for scoring if needed
if hasTextMatch || hasVectorMatch {
if !isSkillIndex && !isMemoryIndex {
queryBody["_source"] = []string{common.PAGERANK_FLD, common.TAG_FLD}
}
}
}
// Serialize query
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(queryBody); err != nil {
return nil, fmt.Errorf("error encoding query: %w", err)
}
// Execute search. When useSearchAfter is true we must NOT send
// from/size (we dropped them above) and instead walk the result set
// page-by-page with the search_after cursor — ES otherwise returns
// the first page and the caller gets the wrong page.
var (
totalHits int64
allResults []map[string]interface{}
err error
)
if useSearchAfter {
allResults, totalHits, err = e.searchAfterCursor(ctx, req, queryBody, offset, limit)
if err != nil {
return nil, err
}
} else {
// WithBody takes an io.Reader that the Go client streams
// directly into the request. Reusing &buf across iterations
// would drain it on the first request and leave the rest
// with an empty body — so we copy the bytes once and hand
// each iteration a fresh bytes.NewReader.
payload := append([]byte(nil), buf.Bytes()...)
for _, indexName := range req.IndexNames {
res, err := e.client.Search(
e.client.Search.WithContext(ctx),
e.client.Search.WithIndex(indexName),
e.client.Search.WithBody(bytes.NewReader(payload)),
e.client.Search.WithTrackTotalHits(true),
)
if err != nil {
common.Warn("Elasticsearch query failed", zap.String("index", indexName), zap.Error(err))
continue
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
common.Warn("Elasticsearch error response", zap.String("index", indexName), zap.String("body", string(bodyBytes)))
continue
}
// Parse response and return results
var esResp SearchResponse
if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
common.Warn("Elasticsearch failed to parse response", zap.String("index", indexName), zap.Error(err))
continue
}
searchChunks := convertESResponse(&esResp, "")
totalHits += esResp.Hits.Total.Value
allResults = append(allResults, searchChunks...)
}
}
if isMemoryIndex {
normalizeMemoryMessageChunks(allResults)
}
// Post-processing: Sort results by score
if len(allResults) > 0 && (matchText != nil || hasVectorMatch) {
scoreColumn := "_score"
if matchText != nil && hasVectorMatch {
scoreColumn = "SCORE"
}
pagerankField := common.PAGERANK_FLD
if isSkillIndex {
pagerankField = ""
}
if isMemoryIndex {
pagerankField = ""
}
allResults = calculateScores(allResults, scoreColumn, pagerankField)
allResults = sortByScore(allResults, limit)
}
common.Info("ES Search completed", zap.Int("returnedRows", len(allResults)), zap.Int64("totalHits", totalHits))
return &types.SearchResult{
Chunks: allResults,
Total: totalHits,
}, nil
}
// searchAfterFetcher issues one ES search request with the given batch
// size and search_after cursor, returning the decoded response. Defined
// as a function type so the pagination logic below can be unit-tested
// with a mock fetcher instead of a real Elasticsearch client.
type searchAfterFetcher func(
ctx context.Context,
baseQuery map[string]interface{},
batch int,
cursor []interface{},
trackTotalHits bool,
) (SearchResponse, error)
// searchAfterCursor walks ES with the search_after pagination protocol,
// returning the page [offset, offset+limit) of an explicitly-sorted
// result set. Used when offset+limit exceeds common.MAX_RESULT_WINDOW
// and ES would otherwise reject the from/size combination.
//
// Mirrors rag/utils/es_conn.py:ESConnection._search_with_search_after:
//
// 1. Drop from/size from the base query (the caller has already omitted
// them on this path; this is a defensive no-op).
// 2. Skip phase: discard hits until we have skipped `offset` of them.
// 3. Take phase: collect hits until we have `limit` of them, or the
// index is exhausted.
// 4. After each batch, advance the cursor with the last hit's `sort`
// field. If `sort` is missing or unchanged, the index is exhausted.
//
// The first request carries trackTotalHits=true so the caller still
// gets an accurate total; subsequent requests skip it for efficiency.
// Returns the (possibly empty) collected hits and the total hit count
// from the first response.
func (e *elasticsearchEngine) searchAfterCursor(
ctx context.Context,
req *types.SearchRequest,
baseQuery map[string]interface{},
offset, limit int,
) ([]map[string]interface{}, int64, error) {
// Defensive: strip from/size if the caller left them in. In the
// current code path they are never set when useSearchAfter is true,
// but the base query is a shared map and future callers may forget.
delete(baseQuery, "from")
delete(baseQuery, "size")
return searchAfterPaginate(ctx, baseQuery, offset, limit, e.buildSearchAfterFetcher(req))
}
// buildSearchAfterFetcher returns a fetcher that delegates each
// iteration to executeSearchRequest, which talks to the real ES client.
func (e *elasticsearchEngine) buildSearchAfterFetcher(req *types.SearchRequest) searchAfterFetcher {
return func(
ctx context.Context,
baseQuery map[string]interface{},
batch int,
cursor []interface{},
trackTotalHits bool,
) (SearchResponse, error) {
return e.executeSearchRequest(ctx, req, baseQuery, batch, cursor, trackTotalHits)
}
}
// searchAfterPaginate is the pure, callback-driven pagination loop
// shared by the engine and the unit tests. See searchAfterCursor for
// the semantics.
func searchAfterPaginate(
ctx context.Context,
baseQuery map[string]interface{},
offset, limit int,
fetch searchAfterFetcher,
) ([]map[string]interface{}, int64, error) {
var (
cursor []interface{}
totalHits int64
collected []map[string]interface{}
collectedTake int
firstCall = true
)
// Skip phase: walk past `offset` hits without retaining them.
remainingSkip := offset
for remainingSkip > 0 {
batch := remainingSkip
if batch > common.SearchAfterBatchSize {
batch = common.SearchAfterBatchSize
}
resp, err := fetch(ctx, baseQuery, batch, cursor, firstCall)
firstCall = false
if err != nil {
return nil, 0, err
}
if totalHits == 0 {
totalHits = resp.Hits.Total.Value
}
if len(resp.Hits.Hits) == 0 {
break
}
nextCursor := resp.Hits.Hits[len(resp.Hits.Hits)-1].Sort
if len(nextCursor) == 0 || sortValuesEqual(nextCursor, cursor) {
// ES returned hits but no usable cursor (e.g. sort field
// missing or unchanged). The index is exhausted from our
// point of view.
break
}
cursor = nextCursor
remainingSkip -= len(resp.Hits.Hits)
if len(resp.Hits.Hits) < batch {
// Short batch — we asked for more than was available, so
// the cursor is at the end of the index.
break
}
}
// Take phase: collect up to `limit` hits. ES may return up to
// `batch` hits per request, but we stop at `limit` (the absolute
// target) regardless of how many we asked for in this iteration.
for collectedTake < limit {
want := limit - collectedTake
batch := want
if batch > common.SearchAfterBatchSize {
batch = common.SearchAfterBatchSize
}
resp, err := fetch(ctx, baseQuery, batch, cursor, firstCall)
firstCall = false
if err != nil {
return nil, 0, err
}
if totalHits == 0 {
totalHits = resp.Hits.Total.Value
}
if len(resp.Hits.Hits) == 0 {
break
}
// Convert and append. We could parallelize the conversion with
// the next request, but conversion is cheap relative to the
// ES round-trip, so keep the loop straightforward.
for _, hit := range resp.Hits.Hits {
if collectedTake >= limit {
break
}
chunk := hit.Source
if chunk == nil {
chunk = map[string]interface{}{}
}
chunk["_score"] = hit.Score
chunk["_id"] = hit.ID
chunk["_index"] = hit.Index
collected = append(collected, chunk)
collectedTake++
}
// Reached the absolute limit — stop without advancing the
// cursor (we already have what was asked for).
if collectedTake >= limit {
break
}
nextCursor := resp.Hits.Hits[len(resp.Hits.Hits)-1].Sort
if len(nextCursor) == 0 || sortValuesEqual(nextCursor, cursor) {
break
}
cursor = nextCursor
if len(resp.Hits.Hits) < batch {
break
}
}
// If we never sent a request (e.g. offset == 0 and limit == 0) we
// still need a total. Issue one count-only request.
if totalHits == 0 {
resp, err := fetch(ctx, baseQuery, 0, nil, true)
if err != nil {
return nil, 0, err
}
totalHits = resp.Hits.Total.Value
}
return collected, totalHits, nil
}
// executeSearchRequest sends one ES search request with the given
// batch size and search_after cursor. If trackTotalHits is true the
// request asks ES to compute an exact total (cheap to omit on
// pagination iterations after the first).
func (e *elasticsearchEngine) executeSearchRequest(
ctx context.Context,
req *types.SearchRequest,
baseQuery map[string]interface{},
batch int,
cursor []interface{},
trackTotalHits bool,
) (SearchResponse, error) {
queryBody := make(map[string]interface{}, len(baseQuery)+2)
for k, v := range baseQuery {
queryBody[k] = v
}
if batch > 0 {
queryBody["size"] = batch
}
if len(cursor) > 0 {
queryBody["search_after"] = cursor
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(queryBody); err != nil {
return SearchResponse{}, fmt.Errorf("error encoding query: %w", err)
}
res, err := e.client.Search(
e.client.Search.WithContext(ctx),
e.client.Search.WithIndex(req.IndexNames...),
e.client.Search.WithBody(&buf),
e.client.Search.WithTrackTotalHits(trackTotalHits),
)
if err != nil {
return SearchResponse{}, fmt.Errorf("elasticsearch search failed: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return SearchResponse{}, fmt.Errorf("elasticsearch error response: %s", string(bodyBytes))
}
var esResp SearchResponse
if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
return SearchResponse{}, fmt.Errorf("elasticsearch failed to parse response: %w", err)
}
return esResp, nil
}
// sortValuesEqual reports whether two sort cursors are identical.
// ES guarantees that successive requests with `search_after: <cursor>`
// advance strictly past the cursor, so an unchanged cursor between
// iterations means the index is exhausted.
func sortValuesEqual(a, b []interface{}) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
func mapMemoryMessageESField(field string, useTokenizedContent bool) string {
name := field
boost := ""
if base, suffix, ok := strings.Cut(field, "^"); ok {
name = base
boost = "^" + suffix
}
switch name {
case "message_type":
name = "message_type_kwd"
case "status":
name = "status_int"
case "content":
if useTokenizedContent {
name = "tokenized_content_ltks"
} else {
name = "content_ltks"
}
}
return name + boost
}
func mapMemoryMessageESFields(fields []string, useTokenizedContent bool) []string {
if len(fields) == 0 {
return fields
}
mapped := make([]string, 0, len(fields))
seen := make(map[string]struct{}, len(fields))
for _, field := range fields {
mappedField := mapMemoryMessageESField(field, useTokenizedContent)
if _, ok := seen[mappedField]; ok {
continue
}
seen[mappedField] = struct{}{}
mapped = append(mapped, mappedField)
}
return mapped
}
func normalizeMemoryMessageChunks(chunks []map[string]interface{}) {
for _, chunk := range chunks {
for key, val := range chunk {
if memoryMessageVectorFieldRE.MatchString(key) {
chunk["content_embed"] = val
delete(chunk, key)
}
}
if val, ok := chunk["message_type_kwd"]; ok {
chunk["message_type"] = val
delete(chunk, "message_type_kwd")
}
if val, ok := chunk["status_int"]; ok {
chunk["status"] = memoryMessageStatusBool(val)
delete(chunk, "status_int")
}
if val, ok := chunk["content_ltks"]; ok {
chunk["content"] = val
delete(chunk, "content_ltks")
}
}
}
func memoryMessageStatusBool(value interface{}) bool {
switch v := value.(type) {
case bool:
return v
case int:
return v != 0
case int64:
return v != 0
case float64:
return v != 0
case json.Number:
n, err := v.Int64()
return err == nil && n != 0
case string:
return v != "" && v != "0" && !strings.EqualFold(v, "false")
default:
return false
}
}
// buildBoolQueryFromCondition builds an ES bool query from condition map.
// Skill indexes use status, regular chunk indexes use kb_id, and memory
// message indexes use memory_id plus message-specific storage fields.
func buildBoolQueryFromCondition(filter map[string]interface{}, kbIDs []string, isSkillIndex, isMemoryIndex bool) map[string]interface{} {
var mustClauses []interface{}
var filterClauses []interface{}
var shouldClauses []interface{}
// Memory message indexes use memory_id, regular chunk indexes use kb_id.
if kbIDs != nil && len(kbIDs) > 0 {
fieldName := "kb_id"
if isMemoryIndex {
fieldName = "memory_id"
}
filterClauses = append(filterClauses, map[string]interface{}{
"terms": map[string]interface{}{fieldName: kbIDs},
})
}
// For skill index, add status = "1" filter by default (active skills)
if isSkillIndex {
filterClauses = append(filterClauses, map[string]interface{}{
"term": map[string]interface{}{
"status": "1",
},
})
}
if filter == nil {
filter = make(map[string]interface{})
}
for k, v := range filter {
if isMemoryIndex {
k = mapMemoryMessageESField(k, false)
}
// For skill index, handle 'status' field instead of 'available_int'
if isSkillIndex && k == "status" {
if v == nil || v == "" {
continue
}
if listVal, ok := v.([]interface{}); ok && len(listVal) > 0 {
filterClauses = append(filterClauses, map[string]interface{}{
"terms": map[string]interface{}{"status": listVal},
})
} else if strVal, ok := v.(string); ok && strVal != "" {
filterClauses = append(filterClauses, map[string]interface{}{
"term": map[string]interface{}{"status": strVal},
})
}
continue
}
if k == "available_int" {
var numVal float64
switch val := v.(type) {
case float64:
numVal = val
case int:
numVal = float64(val)
case int64:
numVal = float64(val)
default:
continue
}
if numVal == 0 {
filterClauses = append(filterClauses, map[string]interface{}{
"range": map[string]interface{}{"available_int": map[string]interface{}{"lt": 1}},
})
} else {
filterClauses = append(filterClauses, map[string]interface{}{
"bool": map[string]interface{}{
"must_not": []map[string]interface{}{
{"range": map[string]interface{}{"available_int": map[string]interface{}{"lt": 1}}},
},
},
})
}
continue
}
if k == "id" {
if v == nil || v == "" {
continue
}
if listVal, ok := v.([]interface{}); ok && len(listVal) > 0 {
shouldClauses = append(shouldClauses,
map[string]interface{}{"terms": map[string]interface{}{"id": listVal}},
map[string]interface{}{"terms": map[string]interface{}{"_id": listVal}},
)
} else if strVal, ok := v.(string); ok && strVal != "" {
shouldClauses = append(shouldClauses,
map[string]interface{}{"term": map[string]interface{}{"id": strVal}},
map[string]interface{}{"term": map[string]interface{}{"_id": strVal}},
)
} else if intVal, ok := v.(int); ok && intVal != 0 {
shouldClauses = append(shouldClauses,
map[string]interface{}{"term": map[string]interface{}{"id": intVal}},
map[string]interface{}{"term": map[string]interface{}{"_id": intVal}},
)
}
continue
}
if v == nil || v == "" {
continue
}
if isMemoryIndex && k == "session_id" {
if strVal, ok := v.(string); ok && strVal != "" {
filterClauses = append(filterClauses, map[string]interface{}{
"query_string": map[string]interface{}{
"query": fmt.Sprintf("*%s*", strVal),
"fields": []string{"session_id"},
"analyze_wildcard": true,
},
})
continue
}
}
if listVal, ok := v.([]interface{}); ok {
filterClauses = append(filterClauses, map[string]interface{}{
"terms": map[string]interface{}{k: listVal},
})
} else if strListVal, ok := v.([]string); ok {
filterClauses = append(filterClauses, map[string]interface{}{
"terms": map[string]interface{}{k: strListVal},
})
} else if strVal, ok := v.(string); ok && strVal != "" {
filterClauses = append(filterClauses, map[string]interface{}{
"term": map[string]interface{}{k: strVal},
})
} else if intVal, ok := v.(int); ok {
filterClauses = append(filterClauses, map[string]interface{}{
"term": map[string]interface{}{k: intVal},
})
} else if floatVal, ok := v.(float64); ok {
filterClauses = append(filterClauses, map[string]interface{}{
"term": map[string]interface{}{k: floatVal},
})
}
}
// Build the bool query
boolQuery := make(map[string]interface{})
if len(mustClauses) > 0 {
boolQuery["must"] = mustClauses
}
if len(filterClauses) > 0 {
boolQuery["filter"] = filterClauses
}
if len(shouldClauses) > 0 {
boolQuery["should"] = shouldClauses
boolQuery["minimum_should_match"] = 1
}
if len(boolQuery) == 0 {
return nil
}
return map[string]interface{}{"bool": boolQuery}
}
// buildQueryStringQuery builds a query_string query from MatchTextExpr
// When isSkillIndex is true, uses skill-specific fields (name_tks, tags_tks, etc.)
// Otherwise uses document fields (title_tks, content_ltks, etc.)
func buildQueryStringQuery(matchText *types.MatchTextExpr, vectorSimilarityWeight float64, isSkillIndex, isMemoryIndex bool) map[string]interface{} {
if matchText == nil {
return nil
}
minimumShouldMatch := "0%"
if matchText.ExtraOptions != nil {
if msm, ok := matchText.ExtraOptions["minimum_should_match"].(float64); ok {
minimumShouldMatch = fmt.Sprintf("%d%%", int(msm*100))
}
}
fields := matchText.Fields
if fields == nil || len(fields) == 0 {
if isSkillIndex {
fields = []string{"name_tks^10", "tags_tks^5", "description_tks^3", "content_tks^1"}
} else if isMemoryIndex {
fields = []string{"tokenized_content_ltks"}
} else {
fields = []string{"title_tks^10", "title_sm_tks^5", "important_kwd^30", "important_tks^20", "question_tks^20", "content_ltks^2", "content_sm_ltks"}
}
}
if isMemoryIndex {
fields = mapMemoryMessageESFields(fields, true)
}
boost := 1.0
if matchText.ExtraOptions != nil {
if b, ok := matchText.ExtraOptions["boost"].(float64); ok {
boost = b
}
}
return map[string]interface{}{
"query_string": map[string]interface{}{
"fields": fields,
"type": "best_fields",
"query": matchText.MatchingText,
"minimum_should_match": minimumShouldMatch,
"boost": boost,
},
}
}
// buildRankFeatureQuery builds rank_feature queries for learning to rank
func buildRankFeatureQuery(rankFeature map[string]float64) []map[string]interface{} {
if rankFeature == nil || len(rankFeature) == 0 {
return nil
}
// Sort keys for deterministic query order (Go map iteration is randomized)
keys := make([]string, 0, len(rankFeature))
for k := range rankFeature {
keys = append(keys, k)
}
sort.Strings(keys)
var queries []map[string]interface{}
for _, fld := range keys {
if fld == common.PAGERANK_FLD {
continue
}
sc := rankFeature[fld]
tagField := fmt.Sprintf("%s.%s", common.TAG_FLD, fld)
queries = append(queries, map[string]interface{}{
"rank_feature": map[string]interface{}{
"field": tagField,
"linear": map[string]interface{}{},
"boost": sc,
},
})
}
return queries
}
// GetChunk gets a chunk by ID using ES search API
func (e *elasticsearchEngine) GetChunk(ctx context.Context, baseName, chunkID string, datasetIDs []string) (interface{}, error) {
if strings.HasPrefix(baseName, "memory_") {
return e.getMemoryMessage(ctx, baseName, chunkID)
}
// Try search by doc_id field (which is stored in the document)
for _, datasetID := range datasetIDs {
searchReq := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{"term": map[string]interface{}{"id": chunkID}},
{"term": map[string]interface{}{"kb_id": datasetID}},
},
},
},
}
body, err := json.Marshal(searchReq)
if err != nil {
return nil, fmt.Errorf("failed to marshal search request: %w", err)
}
res, err := e.client.Search(
e.client.Search.WithContext(ctx),
e.client.Search.WithIndex(baseName),
e.client.Search.WithBody(bytes.NewReader(body)),
)
if err != nil {
return nil, fmt.Errorf("failed to search for chunk: %w", err)
}
if res.IsError() {
res.Body.Close()
return nil, fmt.Errorf("failed to search for chunk: %s", res.Status())
}
var searchResult map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil {
res.Body.Close()
return nil, fmt.Errorf("failed to parse search response: %w", err)
}
res.Body.Close()
hits, ok := searchResult["hits"].(map[string]interface{})
if !ok {
continue
}
hitList, ok := hits["hits"].([]interface{})
if !ok || len(hitList) == 0 {
continue
}
firstHit, ok := hitList[0].(map[string]interface{})
if !ok {
continue
}
source, ok := firstHit["_source"].(map[string]interface{})
if !ok {
continue
}
common.Info("GetChunk found hit", zap.String("baseName", baseName), zap.String("chunkID", chunkID))
source["id"] = chunkID
return source, nil
}
common.Info("GetChunk no hits found", zap.String("baseName", baseName), zap.String("chunkID", chunkID))
return nil, nil
}
func (e *elasticsearchEngine) getMemoryMessage(ctx context.Context, indexName, docID string) (interface{}, error) {
req := esapi.GetRequest{
Index: indexName,
DocumentID: docID,
}
res, err := req.Do(ctx, e.client)
if err != nil {
return nil, fmt.Errorf("failed to get memory message: %w", err)
}
defer res.Body.Close()
if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("%w: %s", types.ErrDocumentNotFound, docID)
}
return nil, fmt.Errorf("elasticsearch memory message get error: %s", res.Status())
}
var getResult struct {
Found bool `json:"found"`
Source map[string]interface{} `json:"_source"`
}
if err := json.NewDecoder(res.Body).Decode(&getResult); err != nil {
return nil, fmt.Errorf("failed to parse memory message get response: %w", err)
}
if !getResult.Found || getResult.Source == nil {
return nil, nil
}
message := getResult.Source
message["id"] = docID
normalizeMemoryMessageChunks([]map[string]interface{}{message})
return message, nil
}
// GetFields extracts the requested fields from ES search response chunks
//
// Unlike Infinity, Elasticsearch does NOT use convertSelectFields before querying.
// The original requested field names ARE the database column names:
// - "content_with_weight" is stored and returned as "content_with_weight"
// - No field name mapping is needed in GetFields
func (e *elasticsearchEngine) GetFields(chunks []map[string]interface{}, fields []string) map[string]map[string]interface{} {
common.Info("GetFields called", zap.Int("chunkCount", len(chunks)), zap.Strings("fields", fields))
result := make(map[string]map[string]interface{})
if len(fields) == 0 || len(chunks) == 0 {
return result
}
// Build field set for lookup
fieldSet := make(map[string]bool)
for _, f := range fields {
fieldSet[f] = true
}
for _, chunk := range chunks {
docID, ok := elasticsearchChunkID(chunk)
if !ok {
continue
}
if id, ok := chunk["id"].(string); !ok || id == "" {
chunk["id"] = docID
}
m := make(map[string]interface{})
for field := range fieldSet {
val := chunk[field]
if val == nil {
continue
}
if listVal, ok := val.([]interface{}); ok {
if len(listVal) == 1 {
if _, isArray := listVal[0].([]interface{}); !isArray {
val = listVal[0]
}
}
}
if _, ok := val.([]interface{}); ok {
m[field] = val
continue
}
if field == "available_int" {
if _, ok := val.(int); ok {
m[field] = val
continue
}
if _, ok := val.(float64); ok {
m[field] = val
continue
}
}
if _, ok := val.(string); !ok {
val = fmt.Sprintf("%v", val)
}
m[field] = val
}
if len(m) > 0 {
result[docID] = m
}
}
common.Info("GetFields result", zap.Int("resultCount", len(result)), zap.Strings("keys", func() []string {
keys := make([]string, 0, len(result))
for k := range result {
keys = append(keys, k)
}
return keys
}()))
return result
}
// GetAggregation aggregates chunk values by field name
// Input: [{"docnm_kwd": "docA"}, {"docnm_kwd": "docA"}, {"docnm_kwd": "docB"}]
// Returns: [{"key": "docA", "count": 2}, {"key": "docB", "count": 1}]
func (e *elasticsearchEngine) GetAggregation(chunks []map[string]interface{}, fieldName string) []map[string]interface{} {
if len(chunks) == 0 || fieldName == "" {
return []map[string]interface{}{}
}
tagCounts := make(map[string]int)
for _, chunk := range chunks {
value, ok := chunk[fieldName]
if !ok || value == nil {
continue
}
if valueStr, ok := value.(string); ok {
if valueStr == "" {
continue
}
separator := ","
if fieldName == "tag_kwd" && strings.Contains(valueStr, "###") {
separator = "###"
}
for _, tag := range strings.Split(valueStr, separator) {
countElasticsearchAggregationTag(tagCounts, tag)
}
continue
}
if valueList, ok := value.([]interface{}); ok {
for _, item := range valueList {
if itemStr, ok := item.(string); ok {
countElasticsearchAggregationTag(tagCounts, itemStr)
}
}
}
}
if len(tagCounts) == 0 {
return []map[string]interface{}{}
}
tags := make([]string, 0, len(tagCounts))
for tag := range tagCounts {
tags = append(tags, tag)
}
slices.SortFunc(tags, func(a, b string) int {
if byCount := cmp.Compare(tagCounts[b], tagCounts[a]); byCount != 0 {
return byCount
}
return cmp.Compare(a, b)
})
result := make([]map[string]interface{}, len(tags))
for i, tag := range tags {
result[i] = map[string]interface{}{"key": tag, "count": tagCounts[tag]}
}
return result
}
// GetChunkIDs extracts chunk IDs from ES search response chunks.
// Uses _id field (composite: {doc_id}_{kb_id}_{chunk_id}).
func (e *elasticsearchEngine) GetChunkIDs(chunks []map[string]interface{}) []string {
ids := make([]string, 0, len(chunks))
for _, chunk := range chunks {
if id, ok := elasticsearchChunkID(chunk); ok {
ids = append(ids, id)
}
}
return ids
}
// GetHighlight returns highlighted text for matching keywords
func (e *elasticsearchEngine) GetHighlight(chunks []map[string]interface{}, keywords []string, fieldName string) map[string]string {
result := make(map[string]string)
if len(chunks) == 0 || len(keywords) == 0 {
return result
}
normalizedKeywords := normalizeElasticsearchHighlightKeywords(keywords)
englishPatterns := compileElasticsearchHighlightPatterns(normalizedKeywords)
nonEnglishPattern := compileElasticsearchNonEnglishHighlightPattern(normalizedKeywords)
for _, chunk := range chunks {
docID, ok := elasticsearchChunkID(chunk)
if !ok {
continue
}
if highlightText := firstElasticsearchHighlight(chunk); highlightText != "" {
result[docID] = highlightText
continue
}
txt, ok := chunk[fieldName].(string)
if fieldName == "content_with_weight" && (!ok || txt == "") {
txt, ok = chunk["content"].(string)
}
if !ok || txt == "" {
continue
}
if elasticsearchHighlightEmTagRE.MatchString(txt) {
result[docID] = txt
continue
}
txt = elasticsearchHighlightNewlineRE.ReplaceAllString(txt, " ")
segments := elasticsearchHighlightDelimiterRE.Split(txt, -1)
var highlightedSegments []string
for _, segment := range segments {
segmentToCheck := segment
if isMostlyEnglishElasticsearchSegment(segment) {
for _, pattern := range englishPatterns {
segmentToCheck = pattern.ReplaceAllString(segmentToCheck, "$1<em>$2</em>$3")
}
} else if nonEnglishPattern != nil {
segmentToCheck = nonEnglishPattern.ReplaceAllStringFunc(segmentToCheck, func(match string) string {
return "<em>" + match + "</em>"
})
}
if segmentToCheck != segment {
highlightedSegments = append(highlightedSegments, strings.TrimSpace(segmentToCheck))
}
}
if len(highlightedSegments) > 0 {
result[docID] = strings.Join(highlightedSegments, "... ")
}
}
return result
}
func elasticsearchChunkID(chunk map[string]interface{}) (string, bool) {
if id, ok := chunk["id"].(string); ok && id != "" {
return id, true
}
if id, ok := chunk["_id"].(string); ok && id != "" {
return id, true
}
return "", false
}
func firstElasticsearchHighlight(chunk map[string]interface{}) string {
highlight, ok := chunk["highlight"].(map[string]interface{})
if !ok || len(highlight) == 0 {
return ""
}
for _, vals := range highlight {
if arr, ok := vals.([]interface{}); ok && len(arr) > 0 {
if str, ok := arr[0].(string); ok {
return str
}
}
}
return ""
}
func countElasticsearchAggregationTag(counts map[string]int, tag string) {
if tag = strings.TrimSpace(tag); tag != "" {
counts[tag]++
}
}
func isMostlyEnglishElasticsearchSegment(segment string) bool {
totalCount := len(elasticsearchLetterRE.FindAllString(segment, -1))
return totalCount > 0 && float64(len(elasticsearchEnglishLetterRE.FindAllString(segment, -1)))/float64(totalCount) > 0.5
}
func compileElasticsearchHighlightPatterns(keywords []string) []*regexp.Regexp {
patterns := make([]*regexp.Regexp, 0, len(keywords))
for _, kw := range keywords {
patterns = append(patterns, regexp.MustCompile(`(?i)(^|[ .?/'\"\(\)!,:;-])(`+regexp.QuoteMeta(kw)+`)([ .?/'\"\(\)!,:;-]|$)`))
}
return patterns
}
func compileElasticsearchNonEnglishHighlightPattern(keywords []string) *regexp.Regexp {
if len(keywords) == 0 {
return nil
}
parts := make([]string, 0, len(keywords))
for _, kw := range keywords {
parts = append(parts, regexp.QuoteMeta(kw))
}
return regexp.MustCompile(strings.Join(parts, "|"))
}
func normalizeElasticsearchHighlightKeywords(keywords []string) []string {
seen := make(map[string]struct{}, len(keywords))
normalized := make([]string, 0, len(keywords))
for _, kw := range keywords {
if kw == "" {
continue
}
if _, ok := seen[kw]; !ok {
seen[kw] = struct{}{}
normalized = append(normalized, kw)
}
}
slices.SortStableFunc(normalized, func(a, b string) int {
return cmp.Compare(len(b), len(a))
})
return normalized
}
// DropChunkStore deletes a chunk index
func (e *elasticsearchEngine) DropChunkStore(ctx context.Context, baseName, datasetID string) error {
return e.dropIndex(ctx, baseName)
}
// ChunkStoreExists checks if a chunk index exists
func (e *elasticsearchEngine) ChunkStoreExists(ctx context.Context, baseName, datasetID string) (bool, error) {
return e.indexExists(ctx, baseName)
}
// KNNScores performs a second-pass KNN search to get clean cosine similarities for ES.
// This keeps chunk vectors in the index and asks ES to compute the cosine similarity.
func (e *elasticsearchEngine) KNNScores(ctx context.Context, chunks []map[string]interface{}, queryVector []float64, topK int) (map[string]interface{}, error) {
if len(chunks) == 0 || len(queryVector) == 0 {
return nil, nil
}
// Extract chunk IDs from first search results
chunkIDs := make([]string, 0, len(chunks))
for _, chunk := range chunks {
if id, ok := chunk["_id"].(string); ok {
chunkIDs = append(chunkIDs, id)
}
}
if len(chunkIDs) == 0 {
return nil, nil
}
common.Info("KNNScores starting", zap.Int("chunkCount", len(chunkIDs)), zap.Strings("chunkIDs", chunkIDs), zap.Int("vectorSize", len(queryVector)))
// Build KNN-only query filtered by chunk IDs
vectorSize := len(queryVector)
k := len(chunkIDs)
knnQuery := map[string]interface{}{
"field": fmt.Sprintf("q_%d_vec", vectorSize),
"query_vector": queryVector,
"k": k,
"num_candidates": k * 2,
"similarity": 0.0, // No threshold - get all
"filter": map[string]interface{}{
"terms": map[string]interface{}{"id": chunkIDs},
},
}
queryBody := map[string]interface{}{
"knn": knnQuery,
"size": k,
"_source": false, // Don't need source fields, only need _id and _score
}
body, err := json.Marshal(queryBody)
if err != nil {
return nil, fmt.Errorf("failed to marshal KNN query: %w", err)
}
//common.Info("KNNScores query body", zap.String("body", string(body)))
// Execute search - use first index name from chunks if available
indexName := ""
if len(chunks) > 0 {
if idx, ok := chunks[0]["_index"].(string); ok {
indexName = idx
}
}
res, err := e.client.Search(
e.client.Search.WithContext(ctx),
e.client.Search.WithIndex(indexName),
e.client.Search.WithBody(bytes.NewReader(body)),
)
if err != nil {
return nil, fmt.Errorf("KNN scores search failed: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("KNN scores search returned error: %s, body: %s", res.Status(), string(bodyBytes))
}
var esResp SearchResponse
if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
return nil, fmt.Errorf("failed to parse KNN scores response: %w", err)
}
common.Info("KNNScores ES response", zap.Int("hitCount", len(esResp.Hits.Hits)), zap.Any("firstHit", func() interface{} {
if len(esResp.Hits.Hits) > 0 {
return esResp.Hits.Hits[0]
}
return nil
}()))
// Return raw ES response
// Caller will pass to GetScores to extract scores
knnResult := make(map[string]interface{})
knnResult["hits"] = map[string]interface{}{
"hits": esResp.Hits.Hits,
}
return knnResult, nil
}
// GetScores extracts similarity scores from KNN search result
func (e *elasticsearchEngine) GetScores(knnResult map[string]interface{}) map[string]float64 {
scores := make(map[string]float64)
hits, ok := knnResult["hits"].(map[string]interface{})
if !ok {
return scores
}
hitsList, ok := hits["hits"]
if !ok {
return scores
}
switch v := hitsList.(type) {
case []interface{}:
for _, h := range v {
if hit, ok := h.(map[string]interface{}); ok {
if docID, ok := hit["_id"].(string); ok && docID != "" {
if scoreVal := hit["_score"]; scoreVal != nil {
if score, ok := scoreVal.(float64); ok {
scores[docID] = score
}
}
}
}
}
case []map[string]interface{}:
for _, hit := range v {
if docID, ok := hit["_id"].(string); ok && docID != "" {
if scoreVal := hit["_score"]; scoreVal != nil {
if score, ok := scoreVal.(float64); ok {
scores[docID] = score
}
}
}
}
default:
// Handle slice of structs via reflection
rv := reflect.ValueOf(v)
if rv.Kind() == reflect.Slice {
for i := 0; i < rv.Len(); i++ {
elem := rv.Index(i)
idField := elem.FieldByName("ID")
if !idField.IsValid() {
idField = elem.FieldByName("Id")
}
if !idField.IsValid() || idField.Kind() != reflect.String {
continue
}
docID := idField.String()
if docID == "" {
continue
}
scoreField := elem.FieldByName("Score")
if !scoreField.IsValid() || scoreField.Kind() != reflect.Float64 {
continue
}
scores[docID] = scoreField.Float()
}
}
}
return scores
}
// loadSkillMapping loads the skill index mapping from config file
func loadSkillMapping() (map[string]interface{}, error) {
// Try multiple possible locations for the mapping file
possiblePaths := []string{
"conf/skill_es_mapping.json",
"../conf/skill_es_mapping.json",
"/app/conf/skill_es_mapping.json",
}
var data []byte
var err error
for _, path := range possiblePaths {
data, err = os.ReadFile(path)
if err == nil {
break
}
}
if err != nil {
// Fallback to default skill mapping if file not found
return getDefaultSkillMapping(), nil
}
var mapping map[string]interface{}
if err := json.Unmarshal(data, &mapping); err != nil {
return nil, fmt.Errorf("failed to parse skill mapping: %w", err)
}
return mapping, nil
}
func memoryMessageVectorField(vectorSize int) string {
return fmt.Sprintf("q_%d_vec", vectorSize)
}
func memoryMessageVectorProperty(vectorSize int) map[string]interface{} {
return map[string]interface{}{
"type": "dense_vector",
"dims": vectorSize,
"index": true,
"similarity": "cosine",
}
}
func parseMemoryMessageVectorSize(field string) (int, bool) {
if !memoryMessageVectorFieldRE.MatchString(field) {
return 0, false
}
sizeText := strings.TrimSuffix(strings.TrimPrefix(field, "q_"), "_vec")
vectorSize, err := strconv.Atoi(sizeText)
if err != nil || vectorSize <= 0 {
return 0, false
}
return vectorSize, true
}
func (e *elasticsearchEngine) memoryMessageVectorMappingExists(ctx context.Context, indexName, fieldName string) (bool, error) {
req := esapi.IndicesGetMappingRequest{
Index: []string{indexName},
}
res, err := req.Do(ctx, e.client)
if err != nil {
return false, fmt.Errorf("failed to get memory vector mapping: %w", err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
return false, nil
}
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
reason := extractErrorReason(bodyBytes)
if reason != "" {
return false, fmt.Errorf("elasticsearch error getting memory vector mapping %s.%s: %s", indexName, fieldName, reason)
}
return false, fmt.Errorf("elasticsearch returned error getting memory vector mapping %s.%s: %s, body: %s", indexName, fieldName, res.Status(), string(bodyBytes))
}
var mappings map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&mappings); err != nil {
return false, fmt.Errorf("failed to decode memory vector mapping: %w", err)
}
indexMapping, ok := mappings[indexName].(map[string]interface{})
if !ok {
return false, nil
}
mapping, ok := indexMapping["mappings"].(map[string]interface{})
if !ok {
return false, nil
}
properties, ok := mapping["properties"].(map[string]interface{})
if !ok {
return false, nil
}
_, ok = properties[fieldName]
return ok, nil
}
func (e *elasticsearchEngine) ensureMemoryMessageVectorMapping(ctx context.Context, indexName string, vectorSize int) error {
if vectorSize <= 0 {
return fmt.Errorf("memory vector size must be positive, got %d", vectorSize)
}
fieldName := memoryMessageVectorField(vectorSize)
exists, err := e.memoryMessageVectorMappingExists(ctx, indexName, fieldName)
if err != nil {
return err
}
if exists {
return nil
}
body := map[string]interface{}{
"properties": map[string]interface{}{
fieldName: memoryMessageVectorProperty(vectorSize),
},
}
data, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("failed to marshal memory vector mapping: %w", err)
}
req := esapi.IndicesPutMappingRequest{
Index: []string{indexName},
Body: bytes.NewReader(data),
}
res, err := req.Do(ctx, e.client)
if err != nil {
return fmt.Errorf("failed to update memory vector mapping: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
reason := extractErrorReason(bodyBytes)
if reason != "" {
return fmt.Errorf("elasticsearch error updating memory vector mapping %s.%s: %s", indexName, fieldName, reason)
}
return fmt.Errorf("elasticsearch returned error updating memory vector mapping %s.%s: %s, body: %s", indexName, fieldName, res.Status(), string(bodyBytes))
}
return nil
}
func (e *elasticsearchEngine) ensureMemoryMessageVectorMappingsForDocs(ctx context.Context, indexName string, chunks []map[string]interface{}) error {
seen := map[int]struct{}{}
for _, chunk := range chunks {
for field := range chunk {
vectorSize, ok := parseMemoryMessageVectorSize(field)
if !ok {
continue
}
if _, ok := seen[vectorSize]; ok {
continue
}
if err := e.ensureMemoryMessageVectorMapping(ctx, indexName, vectorSize); err != nil {
return err
}
seen[vectorSize] = struct{}{}
}
}
return nil
}
func (e *elasticsearchEngine) ensureMemoryMessageSearchVectorMappings(ctx context.Context, indexNames []string, vectorFieldName string, fallbackVectorSize int) error {
vectorSize, ok := parseMemoryMessageVectorSize(vectorFieldName)
if !ok {
vectorSize = fallbackVectorSize
}
if vectorSize <= 0 {
return fmt.Errorf("memory vector size must be positive, got %d", vectorSize)
}
for _, indexName := range indexNames {
if !strings.HasPrefix(indexName, "memory_") {
continue
}
exists, err := e.indexExists(ctx, indexName)
if err != nil {
return fmt.Errorf("failed to check memory index existence: %w", err)
}
if !exists {
continue
}
if err := e.ensureMemoryMessageVectorMapping(ctx, indexName, vectorSize); err != nil {
return err
}
}
return nil
}
func getMemoryMessageMapping(vectorSize int) map[string]interface{} {
vectorField := memoryMessageVectorField(vectorSize)
return map[string]interface{}{
"settings": map[string]interface{}{
"number_of_shards": 1,
"number_of_replicas": 0,
},
"mappings": map[string]interface{}{
"properties": map[string]interface{}{
"id": map[string]interface{}{
"type": "keyword",
},
"doc_id": map[string]interface{}{
"type": "keyword",
},
"kb_id": map[string]interface{}{
"type": "keyword",
},
"memory_id": map[string]interface{}{
"type": "keyword",
},
"user_id": map[string]interface{}{
"type": "keyword",
},
"agent_id": map[string]interface{}{
"type": "keyword",
},
"session_id": map[string]interface{}{
"type": "keyword",
},
"message_id": map[string]interface{}{
"type": "long",
},
"source_id": map[string]interface{}{
"type": "long",
},
"message_type_kwd": map[string]interface{}{
"type": "keyword",
},
"status_int": map[string]interface{}{
"type": "integer",
},
"content": map[string]interface{}{
"type": "text",
"index": false,
},
"content_ltks": map[string]interface{}{
"type": "text",
"analyzer": "whitespace",
},
"tokenized_content_ltks": map[string]interface{}{
"type": "text",
"analyzer": "whitespace",
},
"valid_at": map[string]interface{}{
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis",
},
"invalid_at": map[string]interface{}{
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis",
},
"forget_at": map[string]interface{}{
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis",
},
vectorField: memoryMessageVectorProperty(vectorSize),
},
},
}
}
// getDefaultSkillMapping returns the default skill index mapping
func getDefaultSkillMapping() map[string]interface{} {
return map[string]interface{}{
"settings": map[string]interface{}{
"index": map[string]interface{}{
"number_of_shards": 1,
"number_of_replicas": 0,
"refresh_interval": "1000ms",
},
},
"mappings": map[string]interface{}{
"dynamic": false,
"properties": map[string]interface{}{
"skill_id": map[string]interface{}{
"type": "keyword",
"store": true,
},
"name": map[string]interface{}{
"type": "text",
"index": false,
"store": true,
},
"name_tks": map[string]interface{}{
"type": "text",
"analyzer": "whitespace",
"store": true,
},
"tags": map[string]interface{}{
"type": "text",
"index": false,
"store": true,
},
"tags_tks": map[string]interface{}{
"type": "text",
"analyzer": "whitespace",
"store": true,
},
"description": map[string]interface{}{
"type": "text",
"index": false,
"store": true,
},
"description_tks": map[string]interface{}{
"type": "text",
"analyzer": "whitespace",
"store": true,
},
"content": map[string]interface{}{
"type": "text",
"index": false,
"store": true,
},
"content_tks": map[string]interface{}{
"type": "text",
"analyzer": "whitespace",
"store": true,
},
"q_3072_vec": map[string]interface{}{
"type": "dense_vector",
"dims": 3072,
"index": true,
"similarity": "cosine",
},
"q_2560_vec": map[string]interface{}{
"type": "dense_vector",
"dims": 2560,
"index": true,
"similarity": "cosine",
},
"q_1536_vec": map[string]interface{}{
"type": "dense_vector",
"dims": 1536,
"index": true,
"similarity": "cosine",
},
"q_1024_vec": map[string]interface{}{
"type": "dense_vector",
"dims": 1024,
"index": true,
"similarity": "cosine",
},
"q_768_vec": map[string]interface{}{
"type": "dense_vector",
"dims": 768,
"index": true,
"similarity": "cosine",
},
"q_512_vec": map[string]interface{}{
"type": "dense_vector",
"dims": 512,
"index": true,
"similarity": "cosine",
},
"q_256_vec": map[string]interface{}{
"type": "dense_vector",
"dims": 256,
"index": true,
"similarity": "cosine",
},
"version": map[string]interface{}{
"type": "keyword",
"store": true,
},
"status": map[string]interface{}{
"type": "keyword",
"store": true,
},
"create_time": map[string]interface{}{
"type": "long",
"store": true,
},
"update_time": map[string]interface{}{
"type": "long",
"store": true,
},
},
},
}
}
// rerankWindow returns the candidate-window size shared by retrieval's
// block fetch and slice. Mirrors Dealer._rerank_window in rag/nlp/search.py.
//
// `size` is the per-page size; the window MUST be an exact multiple of it,
// otherwise the block fetched (offset // window) and the in-block page slice
// (offset % window) drift apart and deep pagination silently drops results.
//
// The window targets a provider-friendly pool of ~64 candidates, bounded by
// `topK` when given (i.e. when an external reranker is active), and is always
// rounded UP to a whole number of pages to preserve the alignment invariant.
func rerankWindow(size, topK int) int {
if size <= 1 {
if topK > 0 {
return min(30, topK)
}
return 30
}
window := ((64 + size - 1) / size) * size // ceil(64/size) * size
if topK > 0 {
if aligned := ((topK + size - 1) / size) * size; window > aligned {
window = aligned
}
}
return window
}
// calculatePagination calculates offset and limit based on page, size and topK
func calculatePagination(page, size, topK int) (int, int) {
if page < 1 {
page = 1
}
if size <= 0 {
size = 30
}
if topK <= 0 {
topK = 1024
}
window := rerankWindow(size, topK)
offset := (page - 1) * window
if offset < 0 {
offset = 0
}
return offset, window
}
// convertESResponse converts ES SearchResponse to unified chunks format
func convertESResponse(esResp *SearchResponse, vectorFieldName string) []map[string]interface{} {
if esResp == nil || esResp.Hits.Hits == nil {
return []map[string]interface{}{}
}
chunks := make([]map[string]interface{}, len(esResp.Hits.Hits))
for i, hit := range esResp.Hits.Hits {
chunks[i] = hit.Source
if chunks[i] == nil {
chunks[i] = make(map[string]interface{})
}
chunks[i]["_score"] = hit.Score
chunks[i]["_id"] = hit.ID
chunks[i]["_index"] = hit.Index
if len(hit.Highlight) > 0 {
chunks[i]["highlight"] = hit.Highlight
}
}
return chunks
}
// parseOrderByExpr parses the OrderBy expression into ES sort format
func parseOrderByExpr(orderBy *types.OrderByExpr) []map[string]interface{} {
if orderBy == nil || len(orderBy.Fields) == 0 {
return nil
}
var result []map[string]interface{}
for _, field := range orderBy.Fields {
direction := "asc"
if field.Type == types.SortDesc {
direction = "desc"
}
// Skip id field (cannot order by text field)
if field.Field == "id" {
continue
}
// Special handling for page_num_int and top_int
if field.Field == "page_num_int" || field.Field == "top_int" {
result = append(result, map[string]interface{}{
field.Field: map[string]interface{}{
"order": direction,
"unmapped_type": "float",
"mode": "avg",
"numeric_type": "double",
},
})
} else if strings.HasSuffix(field.Field, "_int") || strings.HasSuffix(field.Field, "_flt") {
// Fields ending with _int or _flt
result = append(result, map[string]interface{}{
field.Field: map[string]interface{}{
"order": direction,
"unmapped_type": "float",
},
})
} else if field.Field == "_score" || field.Field == "score" {
result = append(result, map[string]interface{}{
"_score": direction,
})
} else {
// Default: unmapped_type = keyword
result = append(result, map[string]interface{}{
field.Field: map[string]interface{}{
"order": direction,
"unmapped_type": "keyword",
},
})
}
}
return result
}
// calculateScores calculates _score for chunks
func calculateScores(chunks []map[string]interface{}, scoreColumn, pagerankField string) []map[string]interface{} {
for i := range chunks {
score := 0.0
if scoreVal, ok := chunks[i][scoreColumn]; ok {
if f, ok := toFloat64(scoreVal); ok {
score += f
}
}
if pagerankField != "" {
if prVal, ok := chunks[i][pagerankField]; ok {
if f, ok := toFloat64(prVal); ok {
score += f
}
}
}
chunks[i]["_score"] = score
}
return chunks
}
// toFloat64 converts a value to float64
func toFloat64(v interface{}) (float64, bool) {
switch val := v.(type) {
case float64:
return val, true
case float32:
return float64(val), true
case int:
return float64(val), true
case int64:
return float64(val), true
}
return 0, false
}
// sortByScore sorts chunks by _score descending and limits
func sortByScore(chunks []map[string]interface{}, limit int) []map[string]interface{} {
if len(chunks) == 0 {
return chunks
}
// Sort by _score descending
sort.Slice(chunks, func(i, j int) bool {
scoreI := getChunkScore(chunks[i])
scoreJ := getChunkScore(chunks[j])
return scoreI > scoreJ
})
// Limit
if len(chunks) > limit && limit > 0 {
chunks = chunks[:limit]
}
return chunks
}
// getChunkScore extracts the score from a chunk
func getChunkScore(chunk map[string]interface{}) float64 {
if v, ok := chunk["_score"].(float64); ok {
return v
}
if v, ok := chunk["SCORE"].(float64); ok {
return v
}
if v, ok := chunk["SIMILARITY"].(float64); ok {
return v
}
return 0.0
}