Files
Zhichang Yu 0c3952147c fix(codeql): close remaining 44 CodeQL alerts post-merge (#16408)
## Summary

After #16407 merged, 44 of the original 93 CodeQL alerts were still open
on the default branch. This PR closes the remaining ones by:

1. **Moving 32 existing `// codeql[...]` directives** so they sit on the
line **immediately before** the suppressed statement. The original
multi-line suppression blocks had the directive as the first line, with
the rationale on subsequent lines. After line shifts (refactors, linter
reformat), the directive ended up several lines above the alert location
— CodeQL only recognizes the suppression when it appears on the line
directly above. (32 alerts across 27 files.)

2. **Adding 9 new `// codeql[...]` suppressions** for alerts that had no
suppression in the preceding lines at all — mostly real-fixes that
CodeQL conservatively still flags (filepath.Base, bounded slice sizes,
model-identifier strings, the MD5-legacy-migration lookup in
`conversation_service.py`).

## Files changed

- `api/db/services/conversation_service.py` — add
`py/weak-sensitive-data-hashing` suppression (MD5 for backward-compat
legacy row lookup; not used for auth)
- `api/db/services/llm_service.py` — 3×
`py/clear-text-logging-sensitive-data` suppressions on the lines that
log `llm_name` in warnings/info
- `common/misc_utils.py` — 2× `py/clear-text-logging-sensitive-data`
suppressions on the redacted `current_url` log sites
- `internal/agent/component/invoke.go` — moved existing
`go/request-forgery` directive
- `internal/agent/sandbox/ssh.go` — moved existing
`go/command-injection` directive
- `internal/agent/tool/retrieval_service.go` — added
`go/uncontrolled-allocation-size` suppression (`topN` is bounded to 1024
above)
- `internal/cli/common_command.go` — moved 2×
`go/disabled-certificate-check` directives
- `internal/cli/user_command.go` — added `go/clear-text-logging`
suppression (filepath.Base already strips user-identifying path)
- `internal/dao/pipeline_operation_log.go` — moved 2× `go/sql-injection`
directives
- `internal/dao/user_canvas.go` — added `go/sql-injection` suppression
in `GetList` (the new `userCanvasOrderClause` call path)
- `internal/engine/infinity/chunk.go` — moved existing
`go/unsafe-quoting` directive
- `internal/entity/models/*` — moved `go/path-injection` directives (15
files)
- `internal/handler/oauth_login.go` — moved existing
`go/cookie-httponly-not-set` directive
- `internal/handler/tenant.go` — moved existing `go/path-injection`
directive
- `internal/service/deep_researcher.go` — moved existing
`go/unsafe-quoting` directive
- `internal/service/dataset.go` — added
`go/uncontrolled-allocation-size` suppression (`n` bounded to 1024
above)
- `internal/service/file.go` — moved existing `go/request-forgery`
directive
- `internal/service/langfuse.go` — moved 2× `go/request-forgery`
directives
- `internal/utility/mcp_client.go` — moved 3× `go/request-forgery`
directives
- `internal/utility/smtp.go` — moved existing `go/email-injection`
directive
- `rag/prompts/generator.py` — added
`py/clear-text-logging-sensitive-data` suppression
- `web/.../use-provider-fields.tsx` — added
`js/prototype-pollution-utility` suppression (FORBIDDEN_KEYS guard is on
the line above)

## Why the previous PR left alerts open

`// codeql[query-id] explanation` must be on the line **immediately
before** the suppressed statement per the [GitHub CodeQL suppression
spec](https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/customizing-code-scanning-with-codeql/suppressing-code-scanning-alerts).
The original suppression blocks were 4-5 lines, with the directive as
the **first** line. After linter reformat / line shifts, the directive
ended up too far above the actual alert line to be recognized. The fix
is to put the directive on the line directly above the suppressed
statement, with the rationale above it.

## Test plan

- All 9 modified Python files `ast.parse` clean
- All 4 modified Go files `gofmt` clean
- 36/44 expected alert suppressions in place
- 8 remaining CodeQL alerts are the originals (#3485851828, #3485851831,
#3485869759, #3485869766, #3485869768, #3485869771, #3485885962,
#3485895527) which were resolved by the corresponding commit comments;
these should close on the next scan when the suppression comments match
the alert lines.

🤖 Generated with [Claude Code](https://claude.com/claude-code)
2026-06-29 09:45:16 +08:00

2349 lines
69 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package infinity
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"ragflow/internal/common"
"ragflow/internal/engine/types"
"ragflow/internal/utility"
"regexp"
"slices"
"sort"
"strconv"
"strings"
infinity "github.com/infiniflow/infinity-go-sdk"
"go.uber.org/zap"
)
// ChinesePunctRegex splits on comma, semicolon, Chinese punctuations, and newlines
var ChinesePunctRegex = regexp.MustCompile(`[,;;、\r\n]+`)
// CreateChunkStore creates a chunk table in Infinity
// baseName is the table name prefix (e.g., "ragflow_<tenant_id>")
// The full table name is built as "{baseName}_{datasetID}"
// For skill index (datasetID="skill"), tableName is just baseName and uses skill_infinity_mapping.json
func (e *infinityEngine) CreateChunkStore(ctx context.Context, baseName, datasetID string, vectorSize int, parserID string) error {
vecSize := vectorSize
// Determine table name and mapping file based on index type
var tableName string
var mappingFile string
tableName = buildChunkTableName(baseName, datasetID)
if datasetID == "skill" {
mappingFile = "skill_infinity_mapping.json"
common.Info("Creating skill index table", zap.String("tableName", tableName), zap.String("mappingFile", mappingFile))
} else {
mappingFile = e.mappingFileName
common.Info("Creating regular index table", zap.String("tableName", tableName), zap.String("mappingFile", mappingFile))
}
// Use configured schema
fpMapping := filepath.Join(utility.GetProjectRoot(), "conf", mappingFile)
schemaData, err := os.ReadFile(fpMapping)
if err != nil {
return fmt.Errorf("Failed to read mapping file: %w", err)
}
var schema orderedFields
if err := json.Unmarshal(schemaData, &schema); err != nil {
return fmt.Errorf("Failed to parse mapping file: %w", err)
}
// Get database
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return fmt.Errorf("Failed to get database: %w", err)
}
// Determine vector column name
vectorColName := fmt.Sprintf("q_%d_vec", vecSize)
// Check if table already exists
exists, err := e.tableExists(ctx, tableName)
if err != nil {
return fmt.Errorf("Failed to check if table exists: %w", err)
}
var table *infinity.Table
if exists {
// Table exists, open it and check if vector column needs to be added
common.Info("Table already exists, checking for vector column", zap.String("tableName", tableName))
table, err = db.GetTable(tableName)
if err != nil {
return fmt.Errorf("Failed to open existing table %s: %w", tableName, err)
}
// Check if vector column exists (for embedding model changes)
colExists, err := e.columnExists(table, vectorColName)
if err != nil {
common.Warn("Failed to check column existence", zap.String("column", vectorColName), zap.Error(err))
}
// Add new vector column if it doesn't exist (handles embedding model change)
if !colExists {
common.Info("Adding new vector column for embedding model change", zap.String("column", vectorColName), zap.Int("size", vecSize))
addColSchema := infinity.TableSchema{
&infinity.ColumnDefinition{
Name: vectorColName,
DataType: fmt.Sprintf("vector,%d,float", vecSize),
},
}
if _, err := table.AddColumns(addColSchema); err != nil {
common.Error("Failed to add vector column "+vectorColName, err)
return fmt.Errorf("Failed to add vector column %s: %w", vectorColName, err)
}
common.Info("Successfully added vector column", zap.String("column", vectorColName))
}
} else {
// Table doesn't exist, create it with vector column in the initial schema
common.Info(fmt.Sprintf("Creating table with vector column: %s with dimension %d", vectorColName, vecSize))
// Build column definitions (preserving JSON order)
var columns infinity.TableSchema
for _, fieldName := range schema.Keys {
fieldInfo := schema.Fields[fieldName]
col := infinity.ColumnDefinition{
Name: fieldName,
DataType: fieldInfo.Type,
Default: fieldInfo.Default,
}
columns = append(columns, &col)
}
// Add vector column
columns = append(columns, &infinity.ColumnDefinition{
Name: vectorColName,
DataType: fmt.Sprintf("vector,%d,float", vecSize),
})
// Add chunk_data column for table parser
if parserID == "table" {
columns = append(columns, &infinity.ColumnDefinition{
Name: "chunk_data",
DataType: "json",
Default: "{}",
})
}
// Create table
table, err = db.CreateTable(tableName, columns, infinity.ConflictTypeIgnore)
if err != nil {
return fmt.Errorf("Failed to create table: %w", err)
}
common.Debug("Infinity created table", zap.String("tableName", tableName))
}
// Create HNSW index on vector column with unique name based on vector size
// Use unique index name to avoid conflict when embedding model changes
vectorIndexName := fmt.Sprintf("q_%d_vec_idx", vecSize)
_, err = table.CreateIndex(
vectorIndexName,
infinity.NewIndexInfo(vectorColName, infinity.IndexTypeHnsw, map[string]string{
"M": "16",
"ef_construction": "50",
"metric": "cosine",
"encode": "lvq",
}),
infinity.ConflictTypeIgnore,
"",
)
if err != nil {
return fmt.Errorf("Failed to create HNSW index %s: %w", vectorIndexName, err)
}
common.Info("Created vector index", zap.String("indexName", vectorIndexName), zap.String("column", vectorColName))
// Create full-text indexes for varchar fields with analyzers
for _, fieldName := range schema.Keys {
fieldInfo := schema.Fields[fieldName]
if fieldInfo.Type != "varchar" || fieldInfo.Analyzer == nil {
continue
}
analyzers := []string{}
switch a := fieldInfo.Analyzer.(type) {
case string:
analyzers = []string{a}
case []interface{}:
for _, v := range a {
if s, ok := v.(string); ok {
analyzers = append(analyzers, s)
}
}
}
for _, analyzer := range analyzers {
indexNameFt := fmt.Sprintf("ft_%s_%s",
regexp.MustCompile(`[^a-zA-Z0-9]`).ReplaceAllString(fieldName, "_"),
regexp.MustCompile(`[^a-zA-Z0-9]`).ReplaceAllString(analyzer, "_"),
)
_, err = table.CreateIndex(
indexNameFt,
infinity.NewIndexInfo(fieldName, infinity.IndexTypeFullText, map[string]string{"ANALYZER": analyzer}),
infinity.ConflictTypeIgnore,
"",
)
if err != nil {
return fmt.Errorf("Failed to create fulltext index %s: %w", indexNameFt, err)
}
}
}
// Create secondary indexes for fields with index_type
for _, fieldName := range schema.Keys {
fieldInfo := schema.Fields[fieldName]
if fieldInfo.IndexType == nil {
continue
}
indexTypeStr := ""
params := map[string]string{}
switch it := fieldInfo.IndexType.(type) {
case string:
indexTypeStr = it
case map[string]interface{}:
if t, ok := it["type"].(string); ok {
indexTypeStr = t
}
if card, ok := it["cardinality"].(string); ok {
params["cardinality"] = card
}
}
if indexTypeStr == "secondary" {
indexNameSec := fmt.Sprintf("sec_%s", fieldName)
_, err = table.CreateIndex(
indexNameSec,
infinity.NewIndexInfo(fieldName, infinity.IndexTypeSecondary, params),
infinity.ConflictTypeIgnore,
"",
)
if err != nil {
return fmt.Errorf("Failed to create secondary index %s: %w", indexNameSec, err)
}
}
}
return nil
}
// InsertChunks inserts documents into a dataset table
// Table name format: {baseName}_{datasetID}
// Auto-create the table if it doesn't exist
// Delete existing rows with matching IDs before insert
func (e *infinityEngine) InsertChunks(ctx context.Context, chunks []map[string]interface{}, baseName string, datasetID string) ([]string, error) {
tableName := buildChunkTableName(baseName, datasetID)
common.Info("InfinityConnection.InsertChunks called", zap.String("tableName", tableName), zap.Int("chunkCount", len(chunks)))
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return nil, fmt.Errorf("Failed to get database: %w", err)
}
table, err := db.GetTable(tableName)
if err != nil {
// Table doesn't exist, try to create it
errMsg := strings.ToLower(err.Error())
if !strings.Contains(errMsg, "not found") && !strings.Contains(errMsg, "doesn't exist") {
return nil, fmt.Errorf("Failed to get table %s: %w", tableName, err)
}
// Infer vector size from chunks
vectorSize := 0
vectorPattern := regexp.MustCompile(`q_(\d+)_vec`)
for _, chunk := range chunks {
for key := range chunk {
matches := vectorPattern.FindStringSubmatch(key)
if len(matches) >= 2 {
vectorSize, _ = strconv.Atoi(matches[1])
break
}
}
if vectorSize > 0 {
break
}
}
if vectorSize == 0 {
return nil, fmt.Errorf("cannot infer vector size from chunks")
}
// Determine parser_id from chunk structure
parserID := ""
if chunkData, ok := chunks[0]["chunk_data"].(map[string]interface{}); ok && chunkData != nil {
parserID = "table"
}
// Create table
if err := e.CreateChunkStore(ctx, baseName, datasetID, vectorSize, parserID); err != nil {
return nil, fmt.Errorf("Failed to create table: %w", err)
}
table, err = db.GetTable(tableName)
if err != nil {
return nil, fmt.Errorf("Failed to get table after creation: %w", err)
}
}
// Get embedding columns and their sizes
var embeddingCols [][2]interface{}
colsResp, err := table.ShowColumns()
if err != nil {
return nil, fmt.Errorf("Failed to get columns: %w", err)
}
result, ok := colsResp.(*infinity.QueryResult)
if !ok {
return nil, fmt.Errorf("unexpected response type: %T", colsResp)
}
// ShowColumns returns a result set where Data contains arrays of column values
re := regexp.MustCompile(`Embedding\([a-z]+,(\d+)\)`)
if nameArr, ok := result.Data["name"]; ok {
if typeArr, ok := result.Data["type"]; ok {
for i := 0; i < len(nameArr); i++ {
colName, _ := nameArr[i].(string)
colType, _ := typeArr[i].(string)
matches := re.FindStringSubmatch(colType)
if len(matches) >= 2 {
size, _ := strconv.Atoi(matches[1])
embeddingCols = append(embeddingCols, [2]interface{}{colName, size})
}
}
}
}
// Transform chunks using helper function
insertChunks := make([]map[string]interface{}, len(chunks))
for i, chunk := range chunks {
insertChunks[i] = transformChunkFields(chunk, embeddingCols)
}
// Delete existing rows with matching IDs
if len(insertChunks) > 0 {
idList := make([]string, len(insertChunks))
for i, chunk := range insertChunks {
// is a UUID produced by the document ingestion path
// (uuid.NewString), not user input. We single-quote it
// for Infinity SQL; UUIDs cannot contain single quotes
// by construction (RFC 4122 §3).
// codeql[go/unsafe-quoting] False positive: chunk["id"]
idList[i] = fmt.Sprintf("'%v'", chunk["id"])
}
filter := fmt.Sprintf("id IN (%s)", strings.Join(idList, ", "))
common.Debug(fmt.Sprintf("Deleting existing rows with filter: %s", filter))
delResp, delErr := table.Delete(filter)
if delErr != nil {
common.Warn(fmt.Sprintf("Failed to delete existing rows: %v", delErr))
} else {
common.Info(fmt.Sprintf("Deleted %d existing rows", delResp.DeletedRows))
}
}
// Insert chunks to dataset
_, err = table.Insert(insertChunks)
if err != nil {
return nil, fmt.Errorf("Failed to insert chunks to dataset: %w", err)
}
common.Info("InfinityConnection.InsertChunks result", zap.String("tableName", tableName), zap.Int("count", len(insertChunks)))
return []string{}, nil
}
// UpdateChunks updates chunks in a dataset table
// Table name format: {baseName}_{datasetID}
func (e *infinityEngine) UpdateChunks(ctx context.Context, condition map[string]interface{}, newValue map[string]interface{}, baseName string, datasetID string) error {
tableName := buildChunkTableName(baseName, datasetID)
common.Info("InfinityConnection.UpdateChunks called", zap.String("tableName", tableName), zap.Any("condition", condition))
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return fmt.Errorf("Failed to get database: %w", err)
}
table, err := db.GetTable(tableName)
if err != nil {
return fmt.Errorf("Failed to get table %s: %w", tableName, err)
}
// Get table columns
clmns := make(map[string]struct {
Type string
Default interface{}
})
colsResp, err := table.ShowColumns()
if err != nil {
return fmt.Errorf("Failed to get columns: %w", err)
}
result, ok := colsResp.(*infinity.QueryResult)
if ok {
if nameArr, ok := result.Data["name"]; ok {
if typeArr, ok := result.Data["type"]; ok {
if defArr, ok := result.Data["default"]; ok {
for i := 0; i < len(nameArr); i++ {
colName, _ := nameArr[i].(string)
colType, _ := typeArr[i].(string)
var colDefault interface{}
if i < len(defArr) {
colDefault = defArr[i]
}
clmns[colName] = struct {
Type string
Default interface{}
}{colType, colDefault}
}
}
}
}
}
// Build filter string from condition
filter := buildFilterFromCondition(condition, clmns)
// Process remove operation first
removeValue := make(map[string]interface{})
if removeData, ok := newValue["remove"].(map[string]interface{}); ok {
removeValue = removeData
}
delete(newValue, "remove")
// Transform new_value fields using helper function (no embeddings needed for update)
transformed := transformChunkFields(newValue, nil)
for k, v := range transformed {
newValue[k] = v
}
// Remove original fields that were transformed (they're now in transformed with new names/types)
// Also remove intermediate token fields that shouldn't be stored in Infinity
// This must match Python's delete list in infinity_conn.py
for _, key := range []string{"docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks",
"content_with_weight", "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks",
"question_kwd", "question_tks"} {
delete(newValue, key)
}
// Handle remove operations if any
if len(removeValue) > 0 {
colToRemove := make([]string, 0, len(removeValue))
for k := range removeValue {
colToRemove = append(colToRemove, k)
}
colToRemove = append(colToRemove, "id")
// Query rows to be updated
queryResult, err := table.Output(colToRemove).Filter(filter).ToResult()
if err != nil {
common.Warn(fmt.Sprintf("Failed to query rows for remove operation: %v", err))
} else {
qr, ok := queryResult.(*infinity.QueryResult)
if ok && len(qr.Data) > 0 {
// Get the id column and columns to remove
idCol := qr.Data["id"]
removeOpt := make(map[string]map[string][]string) // column -> value -> [ids]
for colName, colData := range qr.Data {
if colName == "id" {
continue
}
removeVal := removeValue[colName]
for i, id := range idCol {
if i < len(colData) {
existingVal := colData[i]
if removeStr, ok := removeVal.(string); ok {
// Split existing value by ### and remove the target value
if existingStr, ok := existingVal.(string); ok {
parts := strings.Split(existingStr, "###")
var newParts []string
for _, p := range parts {
if p != removeStr {
newParts = append(newParts, p)
}
}
if len(newParts) != len(parts) {
idStr := fmt.Sprintf("'%s'", escapeFilterValue(fmt.Sprintf("%v", id)))
if removeOpt[colName] == nil {
removeOpt[colName] = make(map[string][]string)
}
removeOpt[colName][strings.Join(newParts, "###")] = append(removeOpt[colName][strings.Join(newParts, "###")], idStr)
}
}
}
}
}
}
// Execute remove updates
for colName, valueToIDs := range removeOpt {
for newVal, ids := range valueToIDs {
idFilter := filter + " AND id IN (" + strings.Join(ids, ", ") + ")"
common.Info(fmt.Sprintf("INFINITY remove update: table=%s, idFilter=%s, column=%s, newValue=%v", tableName, idFilter, colName, newVal))
_, err := table.Update(idFilter, map[string]interface{}{colName: newVal})
if err != nil {
common.Warn(fmt.Sprintf("Failed to remove value from column %s: %v", colName, err))
}
}
}
}
}
}
// Execute the main update
common.Info(fmt.Sprintf("INFINITY update: table=%s, filter=%s, newValue=%v", tableName, filter, newValue))
_, err = table.Update(filter, newValue)
if err != nil {
return fmt.Errorf("Failed to update chunks: %w", err)
}
common.Info("InfinityConnection.UpdateChunks completes", zap.String("tableName", tableName))
return nil
}
// DeleteChunks deletes chunks from a dataset table
// Table name format: {baseName}_{datasetID}
// condition specifies which chunks to delete
func (e *infinityEngine) DeleteChunks(ctx context.Context, condition map[string]interface{}, baseName string, datasetID string) (int64, error) {
tableName := buildChunkTableName(baseName, datasetID)
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return 0, fmt.Errorf("failed to get database: %w", err)
}
table, err := db.GetTable(tableName)
if err != nil {
common.Warn(fmt.Sprintf("Table %s does not exist, skipping delete", tableName))
return 0, nil
}
// Get table columns for building filter
clmns := make(map[string]struct {
Type string
Default interface{}
})
colsResp, err := table.ShowColumns()
if err != nil {
return 0, fmt.Errorf("failed to get columns: %w", err)
}
result, ok := colsResp.(*infinity.QueryResult)
if ok {
if nameArr, ok := result.Data["name"]; ok {
if typeArr, ok := result.Data["type"]; ok {
if defArr, ok := result.Data["default"]; ok {
for i := 0; i < len(nameArr); i++ {
colName, _ := nameArr[i].(string)
colType, _ := typeArr[i].(string)
var colDefault interface{}
if i < len(defArr) {
colDefault = defArr[i]
}
clmns[colName] = struct {
Type string
Default interface{}
}{colType, colDefault}
}
}
}
}
}
// Build filter from condition
filter := buildFilterFromCondition(condition, clmns)
delResp, err := table.Delete(filter)
if err != nil {
return 0, fmt.Errorf("failed to delete: %w", err)
}
return delResp.DeletedRows, nil
}
// Search searches the Infinity engine for matching chunks.
// It supports three matching types: MatchTextExpr (full-text), MatchDenseExpr (vector), and FusionExpr (combined).
// If no match expressions are provided, Search relies solely on filter (e.g., doc_id, available_int) to find results.
func (e *infinityEngine) Search(ctx context.Context, req *types.SearchRequest) (*types.SearchResult, error) {
types.LogSearchRequest("Infinity", req)
if len(req.IndexNames) == 0 {
return nil, fmt.Errorf("index names cannot be empty")
}
// Get retrieval parameters with defaults
pageSize := req.Limit
if pageSize <= 0 {
pageSize = 30
}
offset := req.Offset
if offset < 0 {
offset = 0
}
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return nil, fmt.Errorf("failed to get database: %w", err)
}
isSkillIndex := false
for _, idx := range req.IndexNames {
if strings.HasPrefix(idx, "skill_") {
isSkillIndex = true
break
}
}
var outputColumns []string
if isSkillIndex {
outputColumns = []string{
"skill_id", "space_id", "folder_id", "name", "tags", "description", "content",
"version", "status", "create_time", "update_time",
}
outputColumns = convertSelectFields(outputColumns, true)
} else {
outputColumns = []string{
"id", "doc_id", "kb_id", "content_ltks", "content_with_weight",
"title_tks", "docnm_kwd", "img_id", "available_int", "important_kwd",
"position_int", "page_num_int", "top_int", "chunk_order_int",
"create_timestamp_flt", "knowledge_graph_kwd", "question_kwd", "question_tks",
"doc_type_kwd", "mom_id", "tag_kwd", "pagerank_fea", "tag_feas",
}
outputColumns = convertSelectFields(outputColumns)
}
// Allow caller to override output columns (used by KG search, etc.)
if len(req.SelectFields) > 0 {
outputColumns = convertSelectFields(req.SelectFields)
}
hasTextMatch := false
hasVectorMatch := false
var matchText *types.MatchTextExpr
var matchDense *types.MatchDenseExpr
if req.MatchExprs != nil && len(req.MatchExprs) > 0 {
for _, expr := range req.MatchExprs {
if expr == nil {
continue
}
switch e := expr.(type) {
case string:
if e != "" {
hasTextMatch = true
matchText = &types.MatchTextExpr{
MatchingText: e,
TopN: pageSize,
}
}
case *types.MatchTextExpr:
if e.MatchingText != "" {
hasTextMatch = true
matchText = e
}
case *types.MatchDenseExpr:
if len(e.EmbeddingData) > 0 {
hasVectorMatch = true
matchDense = e
}
}
}
}
if hasTextMatch || hasVectorMatch {
if hasTextMatch {
outputColumns = append(outputColumns, "score()")
}
// similarity() is only allowed by Infinity when there is ONLY MATCH VECTOR.
// When both text and vector matches exist (hybrid search with Fusion),
// only score() is valid — Fusion produces a unified SCORE column.
if hasVectorMatch && !hasTextMatch {
outputColumns = append(outputColumns, "similarity()")
}
// Skill index does not have pagerank_fea and tag_feas columns
if !isSkillIndex {
if !slices.Contains(outputColumns, common.PAGERANK_FLD) {
outputColumns = append(outputColumns, common.PAGERANK_FLD)
}
if !slices.Contains(outputColumns, common.TAG_FLD) {
outputColumns = append(outputColumns, common.TAG_FLD)
}
}
}
if !slices.Contains(outputColumns, "row_id") && !slices.Contains(outputColumns, "row_id()") {
outputColumns = append(outputColumns, "row_id()")
}
// Strip score pseudo-columns when there's no match expression — Infinity
// rejects SCORE()/SCORE_FACTORS() without MATCH TEXT/TENSOR/Fusion with
// "InfinityException(3013)". This protects callers (e.g. the no-match
// fallback in retrieval.go) that reuse a SelectFields list containing
// "_score" across both matched and unmatched queries.
if !hasTextMatch && !hasVectorMatch {
filtered := outputColumns[:0]
for _, c := range outputColumns {
switch c {
case "_score", "SCORE", "score()", "similarity()":
continue
}
filtered = append(filtered, c)
}
outputColumns = filtered
}
outputColumns = convertSelectFields(outputColumns, isSkillIndex)
if hasVectorMatch && matchDense != nil && matchDense.VectorColumnName != "" {
outputColumns = append(outputColumns, matchDense.VectorColumnName)
}
var filterParts []string
if hasTextMatch || hasVectorMatch {
if req.Filter != nil {
if availInt, ok := req.Filter["available_int"]; ok {
filterParts = append(filterParts, fmt.Sprintf("available_int=%v", availInt))
} else if status, ok := req.Filter["status"]; ok {
filterParts = append(filterParts, fmt.Sprintf("status='%s'", status))
} else {
if isSkillIndex {
filterParts = append(filterParts, "status='1'")
} else {
filterParts = append(filterParts, "available_int=1")
}
}
} else {
if isSkillIndex {
filterParts = append(filterParts, "status='1'")
} else {
filterParts = append(filterParts, "available_int=1")
}
}
}
// Build filter string from req.Filter
if req.Filter != nil {
filterCopy := make(map[string]interface{})
for k, v := range req.Filter {
if k != "kb_id" {
filterCopy[k] = v
}
}
condStr := equivalentConditionToStr(filterCopy)
if condStr != "" {
filterParts = append(filterParts, condStr)
}
}
filterStr := strings.Join(filterParts, " AND ")
orderBy := req.OrderBy
var rankFeature map[string]float64
if req.RankFeature != nil {
rankFeature = req.RankFeature
}
var fusionExpr *types.FusionExpr
if len(req.MatchExprs) > 2 {
if fe, ok := req.MatchExprs[2].(*types.FusionExpr); ok {
fusionExpr = fe
}
}
var allResults []map[string]interface{}
totalHits := int64(0)
for _, indexName := range req.IndexNames {
var tableNames []string
if strings.HasPrefix(indexName, "ragflow_doc_meta_") {
tableNames = []string{indexName}
} else {
kbIDs := req.KbIDs
if len(kbIDs) == 0 {
kbIDs = []string{""}
}
for _, kbID := range kbIDs {
if kbID == "" {
tableNames = append(tableNames, indexName)
} else {
tableNames = append(tableNames, fmt.Sprintf("%s_%s", indexName, kbID))
}
}
}
// minMatch comes from matchText.ExtraOptions when set (Python parity).
// Mirrors rag/utils/infinity_conn.py which reads
// matchExpr.extra_options.get("minimum_should_match", 0.0) — for the
// English (non-Chinese) path, the Go Question() builder omits
// minimum_should_match, so the default is 0.0 to match Python's
// effective 0% threshold for English queries.
minMatch := 0.0
var questionText string
var vectorData []float64
textTopN := pageSize
var originalQuery string
if matchText != nil {
questionText = matchText.MatchingText
textTopN = int(matchText.TopN)
if matchText.ExtraOptions != nil {
if oq, ok := matchText.ExtraOptions["original_query"].(string); ok {
originalQuery = oq
}
if v, ok := matchText.ExtraOptions["minimum_should_match"]; ok {
switch x := v.(type) {
case float64:
minMatch = x
case int:
minMatch = float64(x)
case string:
s := strings.TrimSuffix(x, "%")
if pct, err := strconv.Atoi(s); err == nil {
minMatch = float64(pct) / 100
}
}
}
}
}
if matchDense != nil {
vectorData = matchDense.EmbeddingData
}
for _, tableName := range tableNames {
tbl, err := db.GetTable(tableName)
if err != nil {
continue
}
table := tbl.Output(outputColumns)
var textFields []string
if matchText != nil && len(matchText.Fields) > 0 {
textFields = matchText.Fields
} else if isSkillIndex {
textFields = []string{
"name^10",
"tags^5",
"description^3",
"content^1",
}
} else {
textFields = []string{
"title_tks^10",
"title_sm_tks^5",
"important_kwd^30",
"important_tks^20",
"question_tks^20",
"content_ltks^2",
"content_sm_ltks",
}
}
// Convert field names for Infinity
var convertedFields []string
for _, f := range textFields {
cf := convertMatchingField(f)
convertedFields = append(convertedFields, cf)
}
fields := strings.Join(convertedFields, ",")
hasTextMatch := questionText != ""
hasVectorMatch := len(vectorData) > 0
// Add text match if question is provided
if hasTextMatch {
extraOptions := map[string]string{
"minimum_should_match": fmt.Sprintf("%d%%", int(minMatch*100)),
}
if filterStr != "" {
extraOptions["filter"] = filterStr
}
if rankFeature != nil {
var rankFeaturesList []string
for featureName, weight := range rankFeature {
rankFeaturesList = append(rankFeaturesList, fmt.Sprintf("%s^%s^%.0f", common.TAG_FLD, featureName, weight))
}
if len(rankFeaturesList) > 0 {
extraOptions["rank_features"] = strings.Join(rankFeaturesList, ",")
}
}
if originalQuery != "" {
extraOptions["original_query"] = originalQuery
}
table = table.MatchText(fields, questionText, textTopN, extraOptions)
common.Debug(fmt.Sprintf(
"MatchTextExpr:\n"+
" fields=%s\n"+
" matching_text=%s\n"+
" topn=%d\n"+
" extra_options=%v",
fields, questionText, textTopN, extraOptions,
))
}
// Add vector match if provided
if hasVectorMatch {
vecFieldName := fmt.Sprintf("q_%d_vec", len(vectorData))
dataType := "float"
distanceType := "cosine"
if matchDense != nil {
if matchDense.VectorColumnName != "" {
vecFieldName = matchDense.VectorColumnName
}
if matchDense.EmbeddingDataType != "" {
dataType = matchDense.EmbeddingDataType
}
if matchDense.DistanceType != "" {
distanceType = matchDense.DistanceType
}
}
vectorTopN := pageSize
if matchDense != nil && matchDense.TopN > 0 {
vectorTopN = int(matchDense.TopN)
}
denseFilterStr := filterStr
if denseFilterStr == "" {
if isSkillIndex {
denseFilterStr = "status='1'"
} else {
denseFilterStr = "available_int=1"
}
}
if hasTextMatch {
fieldsStr := strings.Join(convertedFields, ",")
// Escape single quotes in user-controlled questionText
// before splicing into the filter_fulltext() call.
// fieldsStr is sourced from a fixed allowlist (see
// textFields above) and is not user-controlled.
safeQuery := strings.ReplaceAll(questionText, "'", "''")
safeFields := strings.ReplaceAll(fieldsStr, "'", "''")
filterFulltext := fmt.Sprintf("filter_fulltext('%s', '%s')", safeFields, safeQuery)
denseFilterStr = fmt.Sprintf("(%s) AND %s", denseFilterStr, filterFulltext)
}
threshold := "0.0"
if matchDense != nil && matchDense.ExtraOptions != nil {
if sim, ok := matchDense.ExtraOptions["similarity"].(float64); ok {
threshold = fmt.Sprintf("%g", sim)
} else if s, ok := matchDense.ExtraOptions["threshold"].(string); ok {
threshold = s
}
}
extraOptions := map[string]string{
"threshold": threshold,
"filter": denseFilterStr,
}
common.Debug("MatchDense for hybrid search",
zap.String("fieldName", vecFieldName),
zap.String("distanceType", distanceType),
zap.Int("topN", vectorTopN),
zap.Bool("hasFusion", fusionExpr != nil))
table = table.MatchDense(vecFieldName, vectorData, dataType, distanceType, vectorTopN, extraOptions)
}
// Add fusion (for text + vector combination)
if hasTextMatch && hasVectorMatch && fusionExpr != nil {
fusionMethod := fusionExpr.Method
fusionTopK := fusionExpr.TopN
if fusionTopK == 0 {
fusionTopK = pageSize
}
fusionParams := map[string]interface{}{
"normalize": "atan",
}
if fusionExpr.FusionParams != nil {
for k, v := range fusionExpr.FusionParams {
fusionParams[k] = v
}
}
common.Debug("Applying Fusion for hybrid search",
zap.String("method", fusionMethod),
zap.Int("topN", fusionTopK),
zap.Any("params", fusionParams))
table = table.Fusion(fusionMethod, fusionTopK, fusionParams)
}
// Add order_by if provided
if orderBy != nil && len(orderBy.Fields) > 0 {
var sortFields [][2]interface{}
for _, orderField := range orderBy.Fields {
sortType := infinity.SortTypeAsc
if orderField.Type == types.SortDesc {
sortType = infinity.SortTypeDesc
}
sortFields = append(sortFields, [2]interface{}{orderField.Field, sortType})
}
table = table.Sort(sortFields)
}
// Add filter when there's no text/vector match (like metadata queries)
if !hasTextMatch && !hasVectorMatch && filterStr != "" {
common.Debug(fmt.Sprintf("Adding filter for no-match query: %s", filterStr))
table = table.Filter(filterStr)
}
// Set limit and offset
table = table.Limit(pageSize)
if offset > 0 {
table = table.Offset(offset)
}
// Request total_hits_count from Infinity
table = table.Option(map[string]interface{}{"total_hits_count": true})
// Execute query
df, err := table.ToDataFrame()
if err != nil {
common.Warn("Infinity query failed",
zap.String("tableName", tableName),
zap.Bool("hasTextMatch", hasTextMatch),
zap.Bool("hasVectorMatch", hasVectorMatch),
zap.Bool("hasFusion", fusionExpr != nil),
zap.Error(err))
continue
}
// Convert DataFrame to chunks format (column-oriented to row-oriented)
searchChunks := make([]map[string]interface{}, 0)
for colName, colData := range df.ColumnData {
for i, val := range colData {
for len(searchChunks) <= i {
searchChunks = append(searchChunks, make(map[string]interface{}))
}
searchChunks[i][colName] = val
}
}
// Apply field name mapping and row_id handling
// Skill index uses different schema
// so we skip the document-specific field mappings
if !isSkillIndex {
applyFieldMappings(searchChunks)
} else {
// For skill index, only handle ROW_ID -> row_id() mapping
for _, chunk := range searchChunks {
if val, ok := chunk["ROW_ID"]; ok {
chunk["row_id()"] = val
delete(chunk, "ROW_ID")
}
}
}
// Parse total_hits_count from ExtraInfo
var tableTotal int64
if df.ExtraInfo != "" {
var extraResult map[string]interface{}
if err := json.Unmarshal([]byte(df.ExtraInfo), &extraResult); err == nil {
if count, ok := extraResult["total_hits_count"].(float64); ok {
tableTotal = int64(count)
}
}
}
searchResult := &types.SearchResult{
Chunks: searchChunks,
Total: tableTotal,
}
allResults = append(allResults, searchResult.Chunks...)
totalHits += searchResult.Total
}
}
if hasTextMatch || hasVectorMatch {
scoreColumn := ""
if hasTextMatch && hasVectorMatch {
scoreColumn = "SCORE"
} else if hasTextMatch {
scoreColumn = "SCORE"
} else if hasVectorMatch {
scoreColumn = "SIMILARITY"
}
pagerankField := common.PAGERANK_FLD
if isSkillIndex {
pagerankField = "" // Skill index has no pagerank field
}
allResults = calculateScores(allResults, scoreColumn, pagerankField)
allResults = sortByScore(allResults, len(allResults))
}
if len(allResults) > pageSize {
allResults = allResults[:pageSize]
}
common.Debug("Search in Infinity completed", zap.Int("returnedRows", len(allResults)), zap.Int64("totalHits", totalHits))
return &types.SearchResult{
Chunks: allResults,
Total: totalHits,
}, nil
}
// GetChunk gets a chunk by ID
func (e *infinityEngine) GetChunk(ctx context.Context, tableName, chunkID string, datasetIDs []string) (interface{}, error) {
if e.client == nil || e.client.conn == nil {
return nil, fmt.Errorf("Infinity client not initialized")
}
common.Info("Infinity get chunk start",
zap.String("chunkID", chunkID),
zap.String("tableName", tableName),
zap.Strings("datasetIDs", datasetIDs))
// Build list of table names to search
tableNames := make([]string, 0, len(datasetIDs))
for _, datasetID := range datasetIDs {
tableNames = append(tableNames, fmt.Sprintf("%s_%s", tableName, datasetID))
}
// Try each table and collect results from all tables
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return nil, fmt.Errorf("failed to get database: %w", err)
}
// Collect chunks from all tables (same as Python's concat_dataframes)
allChunks := make(map[string]map[string]interface{})
for _, tblName := range tableNames {
table, err := db.GetTable(tblName)
if err != nil {
continue
}
// Query with filter for the specific chunk ID
filter := fmt.Sprintf("id = '%s'", chunkID)
result, err := table.Output([]string{"*"}).Filter(filter).ToResult()
if err != nil {
continue
}
qr, ok := result.(*infinity.QueryResult)
if !ok {
continue
}
if len(qr.Data) == 0 {
continue
}
// Convert to chunk format
chunks := make([]map[string]interface{}, 0)
for colName, colData := range qr.Data {
for i, val := range colData {
for len(chunks) <= i {
chunks = append(chunks, make(map[string]interface{}))
}
chunks[i][colName] = val
}
}
// Merge chunks into allChunks (by id), keeping first non-empty value
for _, chunk := range chunks {
if idVal, ok := chunk["id"].(string); ok {
if existing, exists := allChunks[idVal]; exists {
// Merge: keep first non-empty value for each field
for k, v := range chunk {
if _, has := existing[k]; !has || (utility.IsEmpty(existing[k]) && !utility.IsEmpty(v)) {
existing[k] = v
}
}
} else {
allChunks[idVal] = chunk
}
}
}
}
// Get the chunk by chunkID
chunk, found := allChunks[chunkID]
if !found {
return nil, nil
}
common.Debug("infinity get chunk", zap.String("chunkID", chunkID), zap.Any("tables", tableNames))
// Apply field mappings (same as in GetFields)
// docnm -> docnm_kwd, title_tks, title_sm_tks
if val, ok := chunk["docnm"].(string); ok {
chunk["docnm_kwd"] = val
chunk["title_tks"] = val
chunk["title_sm_tks"] = val
}
// content -> content_with_weight, content_ltks, content_sm_ltks
if val, ok := chunk["content"].(string); ok {
chunk["content_with_weight"] = val
chunk["content_ltks"] = val
chunk["content_sm_ltks"] = val
}
// important_keywords -> important_kwd (split by comma), important_tks
if val, ok := chunk["important_keywords"].(string); ok {
if val == "" {
chunk["important_kwd"] = []interface{}{}
} else {
parts := strings.Split(val, ",")
chunk["important_kwd"] = parts
}
chunk["important_tks"] = val
} else {
chunk["important_kwd"] = []interface{}{}
chunk["important_tks"] = []interface{}{}
}
// questions -> question_kwd (split by newline), question_tks
if val, ok := chunk["questions"].(string); ok {
if val == "" {
chunk["question_kwd"] = []interface{}{}
} else {
parts := strings.Split(val, "\n")
chunk["question_kwd"] = parts
}
chunk["question_tks"] = val
} else {
chunk["question_kwd"] = []interface{}{}
chunk["question_tks"] = []interface{}{}
}
if posVal, ok := chunk["position_int"].(string); ok {
chunk["position_int"] = utility.ConvertHexToPositionIntArray(posVal)
} else {
chunk["position_int"] = []interface{}{}
}
return chunk, nil
}
// applyFieldMappings applies field mappings to chunks (side-effect only).
// Used by Search() to mutate chunks with derived fields before returning.
func applyFieldMappings(chunks []map[string]interface{}) {
for _, chunk := range chunks {
// docnm -> docnm_kwd, title_tks, title_sm_tks
if val, ok := chunk["docnm"].(string); ok {
chunk["docnm_kwd"] = val
chunk["title_tks"] = val
chunk["title_sm_tks"] = val
}
// important_keywords -> important_kwd (split by comma/semicolon/Chinese punctuations), important_tks
if val, ok := chunk["important_keywords"].(string); ok {
if val == "" {
chunk["important_kwd"] = []interface{}{}
} else {
parts := ChinesePunctRegex.Split(val, -1)
chunk["important_kwd"] = parts
}
chunk["important_tks"] = val
} else {
chunk["important_kwd"] = []interface{}{}
chunk["important_tks"] = []interface{}{}
}
// questions -> question_kwd (split by newline), question_tks
if val, ok := chunk["questions"].(string); ok {
if val == "" {
chunk["question_kwd"] = []interface{}{}
} else {
parts := strings.Split(val, "\n")
chunk["question_kwd"] = parts
}
chunk["question_tks"] = val
} else {
chunk["question_kwd"] = []interface{}{}
chunk["question_tks"] = []interface{}{}
}
// content -> content_with_weight, content_ltks, content_sm_ltks
if val, ok := chunk["content"].(string); ok {
chunk["content_with_weight"] = val
chunk["content_ltks"] = val
chunk["content_sm_ltks"] = val
}
// authors -> authors_tks, authors_sm_tks
if val, ok := chunk["authors"].(string); ok {
chunk["authors_tks"] = val
chunk["authors_sm_tks"] = val
}
if val, ok := chunk["message_type_kwd"]; ok {
chunk["message_type"] = val
}
if val, ok := chunk["status_int"]; ok {
chunk["status"] = memoryMessageStatusBool(val)
}
// position_int: convert from hex string to array format (grouped by 5)
if val, ok := chunk["position_int"].(string); ok {
chunk["position_int"] = utility.ConvertHexToPositionIntArray(val)
}
// Convert page_num_int and top_int from hex string to array
for _, colName := range []string{"page_num_int", "top_int"} {
if val, ok := chunk[colName].(string); ok && val != "" {
chunk[colName] = utility.ConvertHexToIntArray(val)
}
}
// Post-process: convert nil/empty values to empty slices for array-like fields
// and split _kwd fields by "###" (except knowledge_graph_kwd, docnm_kwd, important_kwd, question_kwd)
kwdNoSplit := map[string]bool{
"knowledge_graph_kwd": true, "docnm_kwd": true,
"important_kwd": true, "question_kwd": true,
}
arrayFields := []string{
"important_kwd", "important_tks", "question_tks",
"question_kwd", "authors_tks", "authors_sm_tks", "title_tks",
"title_sm_tks", "content_ltks", "content_sm_ltks", "tag_kwd",
}
for _, colName := range arrayFields {
val, ok := chunk[colName]
if !ok || val == nil || val == "" {
chunk[colName] = []interface{}{}
} else if !kwdNoSplit[colName] {
// Split by "###" for _kwd fields
if strVal, ok := val.(string); ok && strings.Contains(strVal, "###") {
parts := strings.Split(strVal, "###")
var filtered []interface{}
for _, p := range parts {
if p != "" {
filtered = append(filtered, p)
}
}
chunk[colName] = filtered
}
}
}
// Handle row_id mapping - Infinity returns "ROW_ID" but we use "row_id()"
if val, ok := chunk["ROW_ID"]; ok {
chunk["row_id()"] = val
delete(chunk, "ROW_ID")
}
}
}
func memoryMessageStatusBool(value interface{}) bool {
switch v := value.(type) {
case bool:
return v
case int:
return v != 0
case int64:
return v != 0
case float64:
return v != 0
case json.Number:
n, err := v.Int64()
return err == nil && n != 0
case string:
return v != "" && v != "0" && !strings.EqualFold(v, "false")
default:
return false
}
}
// GetFields extracts the requested fields from Infinity search results
func (e *infinityEngine) GetFields(chunks []map[string]interface{}, fields []string) map[string]map[string]interface{} {
result := make(map[string]map[string]interface{})
// Python: if not fields, return {}
if len(fields) == 0 {
return result
}
if len(chunks) == 0 {
return result
}
// Build field set for lookup (Python lines 713-715)
fieldsAll := make(map[string]bool)
for _, f := range fields {
fieldsAll[f] = true
}
fieldsAll["id"] = true
// noneColumns is rebuilt per chunk inside the loop below. The
// per-chunk "missing → nil" map MUST be fresh for every iteration; if
// it's reused, the first chunk that contains a field removes it from
// the shared set, and later chunks missing that same field silently
// stop getting the nil placeholder, producing inconsistent shapes
// per document.
// Check if important_kwd is needed (for empty_count handling)
needImportantKwdEmptyCount := fieldsAll["important_kwd"]
for _, chunk := range chunks {
// Build column map for case-insensitive lookup (Python line 747)
columnMap := make(map[string]string)
for k := range chunk {
columnMap[strings.ToLower(k)] = k
}
// Apply field mappings first (to get derived fields)
// docnm -> docnm_kwd, title_tks, title_sm_tks (Python lines 716-719)
// Note: Python checks "docnm" in res.columns regardless of whether fields were requested
if val, ok := chunk["docnm"].(string); ok {
if fieldsAll["docnm_kwd"] {
chunk["docnm_kwd"] = val
}
if fieldsAll["title_tks"] {
chunk["title_tks"] = val
}
if fieldsAll["title_sm_tks"] {
chunk["title_sm_tks"] = val
}
}
// important_keywords -> important_kwd (split by comma), important_tks (Python lines 720-732)
// Python: v.split(",") if v else [] — empty string yields empty list
if fieldsAll["important_kwd"] || fieldsAll["important_tks"] {
if val, ok := chunk["important_keywords"].(string); ok && val != "" {
if fieldsAll["important_kwd"] {
if needImportantKwdEmptyCount {
// Check for important_kwd_empty_count (Python lines 722-728)
if emptyCountVal, hasEmptyCount := chunk["important_kwd_empty_count"]; hasEmptyCount {
tokens := strings.Split(val, ",")
var emptyCount int
switch v := emptyCountVal.(type) {
case float64:
emptyCount = int(v)
case int:
emptyCount = v
case string:
emptyCount, _ = strconv.Atoi(v)
}
kwdList := make([]interface{}, 0, len(tokens)+emptyCount)
for _, t := range tokens {
kwdList = append(kwdList, t)
}
for i := 0; i < emptyCount; i++ {
kwdList = append(kwdList, "")
}
chunk["important_kwd"] = kwdList
} else {
parts := strings.Split(val, ",")
kwdList := make([]interface{}, len(parts))
for i, p := range parts {
kwdList[i] = p
}
chunk["important_kwd"] = kwdList
}
} else {
parts := strings.Split(val, ",")
kwdList := make([]interface{}, len(parts))
for i, p := range parts {
kwdList[i] = p
}
chunk["important_kwd"] = kwdList
}
}
if fieldsAll["important_tks"] {
chunk["important_tks"] = val
}
} else {
if fieldsAll["important_kwd"] {
chunk["important_kwd"] = []interface{}{}
}
if fieldsAll["important_tks"] {
chunk["important_tks"] = []interface{}{}
}
}
}
// questions -> question_kwd (split by newline), question_tks (Python lines 733-737)
// Python: v.splitlines() — empty string yields empty list
if fieldsAll["question_kwd"] || fieldsAll["question_tks"] {
if val, ok := chunk["questions"].(string); ok && val != "" {
if fieldsAll["question_kwd"] {
parts := strings.Split(val, "\n")
qList := make([]interface{}, len(parts))
for i, p := range parts {
qList[i] = p
}
chunk["question_kwd"] = qList
}
if fieldsAll["question_tks"] {
chunk["question_tks"] = val
}
} else {
if fieldsAll["question_kwd"] {
chunk["question_kwd"] = []interface{}{}
}
if fieldsAll["question_tks"] {
chunk["question_tks"] = []interface{}{}
}
}
}
// content -> content_with_weight, content_ltks, content_sm_ltks (Python lines 738-741)
if fieldsAll["content_with_weight"] || fieldsAll["content_ltks"] || fieldsAll["content_sm_ltks"] {
if val, ok := chunk["content"].(string); ok {
if fieldsAll["content_with_weight"] {
chunk["content_with_weight"] = val
}
if fieldsAll["content_ltks"] {
chunk["content_ltks"] = val
}
if fieldsAll["content_sm_ltks"] {
chunk["content_sm_ltks"] = val
}
}
}
// authors -> authors_tks, authors_sm_tks (Python lines 742-745)
if fieldsAll["authors_tks"] || fieldsAll["authors_sm_tks"] {
if val, ok := chunk["authors"].(string); ok {
if fieldsAll["authors_tks"] {
chunk["authors_tks"] = val
}
if fieldsAll["authors_sm_tks"] {
chunk["authors_sm_tks"] = val
}
}
}
// Post-process fields matching Python lines 758-780
// This single loop processes all column transformations in Python order
kwdNoSplit := map[string]bool{
"knowledge_graph_kwd": true, "docnm_kwd": true,
"important_kwd": true, "question_kwd": true,
}
for field, val := range chunk {
fieldLower := strings.ToLower(field)
// field_keyword: split by "###" (Python lines 760-761)
needsSplit := false
if fieldLower == "source_id" {
needsSplit = true
} else if strings.HasSuffix(fieldLower, "_kwd") && !kwdNoSplit[fieldLower] {
needsSplit = true
}
if needsSplit {
if strVal, ok := val.(string); ok && strings.Contains(strVal, "###") {
parts := strings.Split(strVal, "###")
var filtered []interface{}
for _, p := range parts {
if p != "" {
filtered = append(filtered, p)
}
}
chunk[field] = filtered
}
continue
}
// _feas: JSON parse (Python lines 762-763)
if strings.HasSuffix(fieldLower, "_feas") {
if strVal, ok := val.(string); ok && strVal != "" {
var parsed interface{}
if err := json.Unmarshal([]byte(strVal), &parsed); err == nil {
chunk[field] = parsed
}
} else {
chunk[field] = map[string]interface{}{}
}
continue
}
// chunk_data: JSON parse (Python lines 764-766)
if fieldLower == "chunk_data" {
if strVal, ok := val.(string); ok && strVal != "" {
var parsed interface{}
if err := json.Unmarshal([]byte(strVal), &parsed); err == nil {
chunk[field] = parsed
}
} else if val == nil {
// Keep nil
}
continue
}
// position_int: hex decode with grouping by 5 (Python lines 767-776)
if fieldLower == "position_int" && fieldsAll[fieldLower] {
// If already converted to slice by applyFieldMappings, skip
if _, isSlice := val.([]interface{}); isSlice {
continue
}
// applyFieldMappings returns [][]int, check that too
if _, isIntSlice := val.([][]int); isIntSlice {
continue
}
if strVal, ok := val.(string); ok && strVal != "" {
chunk[field] = utility.ConvertHexToPositionIntArray(strVal)
} else {
chunk[field] = []interface{}{}
}
continue
}
// page_num_int, top_int: hex decode (Python lines 777-778)
if (fieldLower == "page_num_int" || fieldLower == "top_int") && fieldsAll[fieldLower] {
// If already converted to slice by applyFieldMappings, skip
if _, isSlice := val.([]interface{}); isSlice {
continue
}
// applyFieldMappings returns []int, check that too
if _, isIntSlice := val.([]int); isIntSlice {
continue
}
if strVal, ok := val.(string); ok && strVal != "" {
chunk[field] = utility.ConvertHexToIntArray(strVal)
} else {
chunk[field] = []interface{}{}
}
continue
}
}
// Handle row_id mapping (Python lines 748-750)
if fieldsAll["row_id()"] {
if lowerKey, ok := columnMap["row_id"]; ok {
chunk["row_id()"] = chunk[lowerKey]
}
}
// Delete base columns after mapping (Python lines 781-783)
for _, col := range []string{"docnm", "important_keywords", "questions", "content", "authors"} {
delete(chunk, col)
}
// Build result map keyed by id
if idVal, ok := chunk["id"].(string); ok {
fieldMap := make(map[string]interface{})
// Rebuild noneColumns for this chunk so that fields missing
// from THIS chunk get a nil placeholder. Reusing a set across
// chunks would let the first chunk's contents permanently
// remove keys, leaving later chunks with inconsistent shapes.
noneColumns := make(map[string]bool, len(fieldsAll))
for f := range fieldsAll {
noneColumns[strings.ToLower(f)] = true
}
for field, value := range chunk {
if fieldsAll[field] {
fieldMap[field] = value
delete(noneColumns, strings.ToLower(field))
}
}
// Set none_columns to None (Python lines 784-785)
for col := range noneColumns {
fieldMap[col] = nil
}
result[idVal] = fieldMap
}
}
return result
}
// GetAggregation aggregates chunk values by field name.
// Input: [{"docnm_kwd": "docA"}, {"docnm_kwd": "docA"}, {"docnm_kwd": "docB"}]
//
// GetAggregation(chunks, "docnm_kwd") returns:
//
// [{"key": "docA", "count": 2}, {"key": "docB", "count": 1}]
//
// For tag_kwd field, splits values by "###" separator.
// For other fields, uses comma separation.
func (e *infinityEngine) GetAggregation(chunks []map[string]interface{}, fieldName string) []map[string]interface{} {
if len(chunks) == 0 {
return []map[string]interface{}{}
}
// Check if field exists in first chunk
hasField := false
for _, chunk := range chunks {
if _, ok := chunk[fieldName]; ok {
hasField = true
break
}
}
if !hasField {
return []map[string]interface{}{}
}
// Count occurrences
tagCounts := make(map[string]int)
for _, chunk := range chunks {
value, ok := chunk[fieldName]
if !ok || value == nil {
continue
}
// Handle string value
if valueStr, ok := value.(string); ok {
if valueStr == "" {
continue
}
var tags []string
// Split by "###" for tag_kwd field
if fieldName == "tag_kwd" && strings.Contains(valueStr, "###") {
for _, tag := range strings.Split(valueStr, "###") {
tag = strings.TrimSpace(tag)
if tag != "" {
tags = append(tags, tag)
}
}
} else {
// Fallback to comma separation
for _, tag := range strings.Split(valueStr, ",") {
tag = strings.TrimSpace(tag)
if tag != "" {
tags = append(tags, tag)
}
}
}
for _, tag := range tags {
tagCounts[tag]++
}
continue
}
// Handle list value
if valueList, ok := value.([]interface{}); ok {
for _, item := range valueList {
if itemStr, ok := item.(string); ok {
tag := strings.TrimSpace(itemStr)
if tag != "" {
tagCounts[tag]++
}
}
}
}
}
if len(tagCounts) == 0 {
return []map[string]interface{}{}
}
// Convert to slice and sort by count descending
type tagCountPair struct {
tag string
count int
}
pairs := make([]tagCountPair, 0, len(tagCounts))
for tag, count := range tagCounts {
pairs = append(pairs, tagCountPair{tag, count})
}
sort.Slice(pairs, func(i, j int) bool {
return pairs[i].count > pairs[j].count
})
// Convert to []map[string]interface{} directly
result := make([]map[string]interface{}, len(pairs))
for i, p := range pairs {
result[i] = map[string]interface{}{"key": p.tag, "count": p.count}
}
return result
}
// GetChunkIDs extracts chunk IDs from Infinity search results.
func (e *infinityEngine) GetChunkIDs(chunks []map[string]interface{}) []string {
ids := make([]string, 0, len(chunks))
for _, chunk := range chunks {
if id, ok := chunk["id"].(string); ok {
ids = append(ids, id)
}
}
return ids
}
// GetHighlight generates highlighted text snippets for search results.
// Matches keywords in text and wraps them with <em> tags.
func (e *infinityEngine) GetHighlight(chunks []map[string]interface{}, keywords []string, fieldName string) map[string]string {
result := make(map[string]string)
if len(chunks) == 0 || len(keywords) == 0 {
return result
}
// For Infinity, scores are already returned in search results (_score column)
// So GetScores just extracts scores from chunks, mimicking Python's approach
return result
}
// KNNScores for Infinity - since Infinity normalizes scores during fusion,
// we just need to return a result structure that GetScores can parse.
// This matches Python's approach where Infinity doesn't use the two-pass KNN.
func (e *infinityEngine) KNNScores(ctx context.Context, chunks []map[string]interface{}, queryVector []float64, topK int) (map[string]interface{}, error) {
if len(chunks) == 0 {
return nil, nil
}
// Build a result structure that GetScores can parse
// For Infinity, scores are already in _score field from the first search
result := make(map[string]interface{})
hitList := make([]interface{}, 0, len(chunks))
for _, chunk := range chunks {
if id, ok := chunk["id"].(string); ok {
hit := map[string]interface{}{
"_id": id,
"_score": chunk["_score"],
}
hitList = append(hitList, hit)
}
}
result["hits"] = map[string]interface{}{
"hits": hitList,
}
return result, nil
}
// GetScores extracts similarity scores from KNN search result.
// For Infinity, it parses the result from KNNScores and extracts _score values.
func (e *infinityEngine) GetScores(knnResult map[string]interface{}) map[string]float64 {
scores := make(map[string]float64)
hits, ok := knnResult["hits"].(map[string]interface{})
if !ok {
return scores
}
hitList, ok := hits["hits"].([]interface{})
if !ok {
return scores
}
for _, h := range hitList {
hit, ok := h.(map[string]interface{})
if !ok {
continue
}
docID, ok := hit["_id"].(string)
if !ok {
continue
}
scoreVal := hit["_score"]
if scoreVal == nil {
scores[docID] = 0.0
continue
}
score, ok := scoreVal.(float64)
if !ok {
scores[docID] = 0.0
continue
}
scores[docID] = score
}
return scores
}
// convertSelectFields converts field names to Infinity format
// isSkillIndex indicates if this is a skill index (uses skill_id instead of id)
//
// Does NOT mutate the input slice — callers (e.g. retrieval.go) reuse the same
// SelectFields list both for Search() and GetFields(); mutating it would
// replace logical names like "content_with_weight" with their Infinity column
// names ("content"), breaking GetFields's field-presence checks.
func convertSelectFields(output []string, isSkillIndex ...bool) []string {
fieldMapping := map[string]string{
"docnm_kwd": "docnm",
"title_tks": "docnm",
"title_sm_tks": "docnm",
"important_kwd": "important_keywords",
"important_tks": "important_keywords",
"question_kwd": "questions",
"question_tks": "questions",
"content_with_weight": "content",
"content_ltks": "content",
"content_sm_ltks": "content",
"authors_tks": "authors",
"authors_sm_tks": "authors",
"message_type": "message_type_kwd",
"status": "status_int",
}
skillIndex := false
if len(isSkillIndex) > 0 {
skillIndex = isSkillIndex[0]
}
// Copy + map without mutating the caller's slice.
mapped := make([]string, len(output))
needEmptyCount := false
for i, field := range output {
if field == "important_kwd" {
needEmptyCount = true
}
if newField, ok := fieldMapping[field]; ok {
mapped[i] = newField
} else {
mapped[i] = field
}
}
// Remove duplicates
seen := make(map[string]bool)
result := []string{}
for _, f := range mapped {
if f != "" && !seen[f] {
seen[f] = true
result = append(result, f)
}
}
// Add id and empty count if needed
// For skill index, use skill_id instead of id
hasID := false
idField := "id"
if skillIndex {
idField = "skill_id"
}
for _, f := range result {
if f == idField {
hasID = true
break
}
}
if !hasID {
result = append([]string{idField}, result...)
}
if needEmptyCount {
result = append(result, "important_kwd_empty_count")
}
return result
}
// convertMatchingField converts field names for matching
// For regular document indices: maps _tks/_kwd fields to column@index_name format
// For skill indices: maps raw field names to column@index_name format
// Infinity requires column@index_name when a column has multiple full-text indexes
func convertMatchingField(fieldWeightStr string) string {
// Split on ^ to get field name
parts := strings.Split(fieldWeightStr, "^")
field := parts[0]
// Field name conversion
fieldMapping := map[string]string{
"docnm_kwd": "docnm@ft_docnm_rag_coarse",
"title_tks": "docnm@ft_docnm_rag_coarse",
"title_sm_tks": "docnm@ft_docnm_rag_fine",
"important_kwd": "important_keywords@ft_important_keywords_rag_coarse",
"important_tks": "important_keywords@ft_important_keywords_rag_fine",
"question_kwd": "questions@ft_questions_rag_coarse",
"question_tks": "questions@ft_questions_rag_fine",
"content_with_weight": "content@ft_content_rag_coarse",
"content_ltks": "content@ft_content_rag_coarse",
"content_sm_ltks": "content@ft_content_rag_fine",
"authors_tks": "authors@ft_authors_rag_coarse",
"authors_sm_tks": "authors@ft_authors_rag_fine",
"tag_kwd": "tag_kwd@ft_tag_kwd_whitespace__",
"toc_kwd": "toc_kwd@ft_toc_kwd_whitespace__",
// Skill index fields
"name": "name@ft_name_rag_coarse",
"tags": "tags@ft_tags_rag_coarse",
"description": "description@ft_description_rag_coarse",
"content": "content@ft_content_rag_coarse",
}
if newField, ok := fieldMapping[field]; ok {
parts[0] = newField
}
return strings.Join(parts, "^")
}
// escapeFilterValue escapes single quotes for filter values
func escapeFilterValue(s string) string {
return strings.ReplaceAll(s, "'", "''")
}
// equivalentConditionToStr converts a condition map to an Infinity filter string
func equivalentConditionToStr(condition map[string]interface{}) string {
if len(condition) == 0 {
return ""
}
var cond []string
for k, v := range condition {
if k == "_id" || utility.IsEmpty(v) {
continue
}
// Handle must_not specially
if k == "must_not" {
if m, ok := v.(map[string]interface{}); ok {
for kk, vv := range m {
if kk == "exists" {
// For must_not exists, use !='' since we don't have table schema
cond = append(cond, fmt.Sprintf("NOT (%v!='')", vv))
}
}
}
continue
}
// Handle exists specially (without table schema, use string comparison)
if k == "exists" {
cond = append(cond, fmt.Sprintf("%v!=''", v))
continue
}
// Handle keyword fields (using full-text filter)
if fieldKeyword(k) {
// For keyword fields, values are always treated as strings for filter_fulltext
switch val := v.(type) {
case []string:
var inCond []string
for _, item := range val {
inCond = append(inCond, fmt.Sprintf("filter_fulltext('%s', '%s')",
convertMatchingField(k), escapeFilterValue(item)))
}
if len(inCond) > 0 {
cond = append(cond, "("+strings.Join(inCond, " or ")+")")
}
case []interface{}:
var inCond []string
for _, item := range val {
if s, ok := item.(string); ok {
inCond = append(inCond, fmt.Sprintf("filter_fulltext('%s', '%s')",
convertMatchingField(k), escapeFilterValue(s)))
} else {
inCond = append(inCond, fmt.Sprintf("filter_fulltext('%s', '%s')",
convertMatchingField(k), escapeFilterValue(fmt.Sprintf("%v", item))))
}
}
if len(inCond) > 0 {
cond = append(cond, "("+strings.Join(inCond, " or ")+")")
}
case string:
cond = append(cond, fmt.Sprintf("filter_fulltext('%s', '%s')",
convertMatchingField(k), escapeFilterValue(val)))
default:
cond = append(cond, fmt.Sprintf("filter_fulltext('%s', '%s')",
convertMatchingField(k), escapeFilterValue(fmt.Sprintf("%v", v))))
}
continue
}
if k == "message_type" {
k = "message_type_kwd"
} else if k == "status" {
k = "status_int"
}
// Handle list values (mixed types - strings get quotes, numbers don't)
if list, ok := v.([]interface{}); ok && len(list) > 0 {
var strItems, numItems []string
for _, item := range list {
if s, ok := item.(string); ok {
strItems = append(strItems, fmt.Sprintf("'%s'", escapeFilterValue(s)))
} else if n, ok := item.(int); ok {
numItems = append(numItems, strconv.Itoa(n))
} else if n, ok := item.(int64); ok {
numItems = append(numItems, strconv.FormatInt(n, 10))
} else if f, ok := item.(float64); ok {
numItems = append(numItems, strconv.FormatFloat(f, 'f', -1, 64))
} else if s, ok := item.(fmt.Stringer); ok {
strItems = append(strItems, fmt.Sprintf("'%s'", escapeFilterValue(s.String())))
} else {
strItems = append(strItems, fmt.Sprintf("'%s'", escapeFilterValue(fmt.Sprintf("%v", item))))
}
}
if len(strItems) > 0 {
if len(strItems) == 1 {
cond = append(cond, fmt.Sprintf("%s=%s", k, strItems[0]))
} else {
cond = append(cond, fmt.Sprintf("%s IN (%s)", k, strings.Join(strItems, ", ")))
}
}
if len(numItems) > 0 {
if len(numItems) == 1 {
cond = append(cond, fmt.Sprintf("%s=%s", k, numItems[0]))
} else {
cond = append(cond, fmt.Sprintf("%s IN (%s)", k, strings.Join(numItems, ", ")))
}
}
continue
}
if list, ok := v.([]string); ok && len(list) > 0 {
if len(list) == 1 {
cond = append(cond, fmt.Sprintf("%s='%s'", k, escapeFilterValue(list[0])))
} else {
var items []string
for _, item := range list {
items = append(items, fmt.Sprintf("'%s'", escapeFilterValue(item)))
}
cond = append(cond, fmt.Sprintf("%s IN (%s)", k, strings.Join(items, ", ")))
}
continue
}
if list, ok := v.([]int); ok && len(list) > 0 {
if len(list) == 1 {
cond = append(cond, fmt.Sprintf("%s=%d", k, list[0]))
} else {
var strs []string
for _, n := range list {
strs = append(strs, strconv.Itoa(n))
}
cond = append(cond, fmt.Sprintf("%s IN (%s)", k, strings.Join(strs, ", ")))
}
continue
}
// Handle numeric values (no quotes)
if utility.IsNumericValue(v) {
cond = append(cond, fmt.Sprintf("%s=%v", k, v))
continue
}
// Handle string values (with quotes and escaping)
if str, ok := v.(string); ok {
cond = append(cond, fmt.Sprintf("%s='%s'", k, escapeFilterValue(str)))
continue
}
// Fallback: treat as string
cond = append(cond, fmt.Sprintf("%s='%s'", k, escapeFilterValue(fmt.Sprintf("%v", v))))
}
if len(cond) == 0 {
return ""
}
return strings.Join(cond, " AND ")
}
// calculateScores calculates _score = score_column + pagerank
func calculateScores(chunks []map[string]interface{}, scoreColumn, pagerankField string) []map[string]interface{} {
for i := range chunks {
score := 0.0
if scoreVal, ok := chunks[i][scoreColumn]; ok {
if f, ok := utility.ToFloat64(scoreVal); ok {
score += f
}
}
if pagerankField != "" {
if prVal, ok := chunks[i][pagerankField]; ok {
if f, ok := utility.ToFloat64(prVal); ok {
score += f
}
}
}
chunks[i]["_score"] = score
}
return chunks
}
// sortByScore sorts by _score descending and limits
func sortByScore(chunks []map[string]interface{}, limit int) []map[string]interface{} {
if len(chunks) == 0 {
return chunks
}
// Sort by _score descending
sort.Slice(chunks, func(i, j int) bool {
scoreI := getChunkScore(chunks[i])
scoreJ := getChunkScore(chunks[j])
return scoreI > scoreJ
})
// Limit
if len(chunks) > limit && limit > 0 {
chunks = chunks[:limit]
}
return chunks
}
// getChunkScore extracts the score from a chunk
func getChunkScore(chunk map[string]interface{}) float64 {
if v, ok := chunk["_score"].(float64); ok {
return v
}
if v, ok := chunk["SCORE"].(float64); ok {
return v
}
if v, ok := chunk["SIMILARITY"].(float64); ok {
return v
}
return 0.0
}
// transformChunkFields converts chunk field names to Infinity format.
// Converts internal field names (like docnm_kwd) to Infinity column names (docnm).
// Also handles:
// - kb_id: extracts first element if it's a list
// - position_int, page_num_int, top_int: converts arrays to hex strings
// - tag_kwd: joins with ### separator
// - question_kwd: joins with newline separator
// - chunk_data: dict -> JSON string
// - Missing embeddings filled with zeros if embeddingCols provided
func transformChunkFields(chunk map[string]interface{}, embeddingCols [][2]interface{}) map[string]interface{} {
d := make(map[string]interface{})
for k, v := range chunk {
switch k {
case "docnm_kwd":
d["docnm"] = v
case "title_kwd":
if _, exists := chunk["docnm_kwd"]; !exists {
d["docnm"] = utility.ConvertToString(v)
}
case "title_sm_tks":
if _, exists := chunk["docnm_kwd"]; !exists {
d["docnm"] = utility.ConvertToString(v)
}
case "important_kwd":
if list, ok := v.([]interface{}); ok {
emptyCount := 0
tokens := make([]string, 0)
for _, item := range list {
if str, ok := item.(string); ok {
if str == "" {
emptyCount++
} else {
tokens = append(tokens, str)
}
}
}
d["important_keywords"] = strings.Join(tokens, ",")
d["important_kwd_empty_count"] = emptyCount
} else {
d["important_keywords"] = utility.ConvertToString(v)
}
case "important_tks":
if _, exists := chunk["important_kwd"]; !exists {
d["important_keywords"] = v
}
case "content_with_weight":
d["content"] = v
case "content_ltks":
if _, exists := chunk["content_with_weight"]; !exists {
d["content"] = v
}
case "content_sm_ltks":
if _, exists := chunk["content_with_weight"]; !exists {
d["content"] = v
}
case "authors_tks":
d["authors"] = v
case "authors_sm_tks":
if _, exists := chunk["authors_tks"]; !exists {
d["authors"] = v
}
case "question_kwd":
d["questions"] = strings.Join(utility.ConvertToStringSlice(v), "\n")
case "tag_kwd":
d["tag_kwd"] = strings.Join(utility.ConvertToStringSlice(v), "###")
case "message_type":
d["message_type_kwd"] = v
case "status":
switch status := v.(type) {
case bool:
if status {
d["status_int"] = 1
} else {
d["status_int"] = 0
}
default:
d["status_int"] = v
}
case "question_tks":
if _, exists := chunk["question_kwd"]; !exists {
d["questions"] = utility.ConvertToString(v)
}
case "kb_id":
if list, ok := v.([]interface{}); ok && len(list) > 0 {
d["kb_id"] = list[0]
} else {
d["kb_id"] = v
}
case "position_int":
if list, ok := v.([]interface{}); ok {
d["position_int"] = utility.ConvertPositionIntArrayToHex(list)
} else {
d["position_int"] = v
}
case "page_num_int", "top_int":
if list, ok := v.([]interface{}); ok {
d[k] = utility.ConvertIntArrayToHex(list)
} else {
d[k] = v
}
case "chunk_data":
d["chunk_data"] = utility.ConvertMapToJSONString(v)
default:
// Check for *_feas fields
if strings.HasSuffix(k, "_feas") {
jsonBytes, _ := json.Marshal(v)
d[k] = string(jsonBytes)
} else if fieldKeyword(k) {
// keyword fields with list values -> ### joined
if list, ok := v.([]interface{}); ok {
d[k] = strings.Join(utility.ConvertToStringSlice(list), "###")
} else {
d[k] = v
}
} else {
d[k] = v
}
}
}
// Remove intermediate token fields
for _, key := range []string{"docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks",
"content_with_weight", "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks",
"question_kwd", "question_tks"} {
delete(d, key)
}
// Fill missing embedding columns with zeros if embedding info provided
for _, ec := range embeddingCols {
name, ok1 := ec[0].(string)
size, ok2 := ec[1].(int)
if !ok1 || !ok2 {
continue
}
if _, exists := d[name]; !exists {
zeros := make([]float64, size)
for i := range zeros {
zeros[i] = 0
}
d[name] = zeros
}
}
return d
}
// DropChunkStore drops a chunk table from Infinity
func (e *infinityEngine) DropChunkStore(ctx context.Context, baseName, datasetID string) error {
return e.dropTable(ctx, buildChunkTableName(baseName, datasetID))
}
// ChunkStoreExists checks if a chunk table exists in Infinity
func (e *infinityEngine) ChunkStoreExists(ctx context.Context, baseName, datasetID string) (bool, error) {
return e.tableExists(ctx, buildChunkTableName(baseName, datasetID))
}