2026-04-09 09:52:31 +08:00
|
|
|
//
|
|
|
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
|
|
|
//
|
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
|
//
|
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
//
|
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
|
// limitations under the License.
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
package infinity
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"os"
|
|
|
|
|
"path/filepath"
|
|
|
|
|
"strings"
|
|
|
|
|
|
2026-05-19 17:34:59 +08:00
|
|
|
"ragflow/internal/common"
|
2026-06-08 11:49:37 +08:00
|
|
|
"ragflow/internal/dao"
|
|
|
|
|
"ragflow/internal/engine/types"
|
2026-05-19 17:34:59 +08:00
|
|
|
"ragflow/internal/utility"
|
2026-05-06 10:41:58 +08:00
|
|
|
|
2026-06-18 18:07:27 +08:00
|
|
|
infinity "github.com/infiniflow/infinity-go-sdk"
|
|
|
|
|
|
2026-04-09 09:52:31 +08:00
|
|
|
"go.uber.org/zap"
|
|
|
|
|
)
|
|
|
|
|
|
2026-05-19 17:34:59 +08:00
|
|
|
// CreateMetadataStore creates a metadata table in Infinity
|
|
|
|
|
// tenantID is the tenant identifier used to build the table name
|
|
|
|
|
func (e *infinityEngine) CreateMetadataStore(ctx context.Context, tenantID string) error {
|
|
|
|
|
tableName := buildMetadataTableName(tenantID)
|
|
|
|
|
|
2026-04-09 09:52:31 +08:00
|
|
|
// Get database
|
|
|
|
|
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to get database: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check if table already exists
|
2026-05-19 17:34:59 +08:00
|
|
|
exists, err := e.tableExists(ctx, tableName)
|
2026-04-09 09:52:31 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to check if table exists: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if exists {
|
2026-05-19 17:34:59 +08:00
|
|
|
return fmt.Errorf("metadata table '%s' already exists", tableName)
|
2026-04-09 09:52:31 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Use configured doc_meta mapping file
|
|
|
|
|
fpMapping := filepath.Join(utility.GetProjectRoot(), "conf", e.docMetaMappingFileName)
|
|
|
|
|
|
|
|
|
|
schemaData, err := os.ReadFile(fpMapping)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to read mapping file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var schema map[string]fieldInfo
|
|
|
|
|
if err := json.Unmarshal(schemaData, &schema); err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to parse mapping file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build column definitions
|
|
|
|
|
var columns infinity.TableSchema
|
|
|
|
|
for fieldName, fieldInfo := range schema {
|
|
|
|
|
col := infinity.ColumnDefinition{
|
2026-05-06 10:41:58 +08:00
|
|
|
Name: fieldName,
|
2026-04-09 09:52:31 +08:00
|
|
|
DataType: fieldInfo.Type,
|
2026-05-06 10:41:58 +08:00
|
|
|
Default: fieldInfo.Default,
|
2026-04-09 09:52:31 +08:00
|
|
|
}
|
|
|
|
|
columns = append(columns, &col)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create table
|
2026-05-19 17:34:59 +08:00
|
|
|
_, err = db.CreateTable(tableName, columns, infinity.ConflictTypeIgnore)
|
2026-04-09 09:52:31 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to create doc meta table: %w", err)
|
|
|
|
|
}
|
2026-05-19 17:34:59 +08:00
|
|
|
common.Debug("Infinity created doc meta table", zap.String("tableName", tableName))
|
2026-04-09 09:52:31 +08:00
|
|
|
|
|
|
|
|
// Get table for creating indexes
|
2026-05-19 17:34:59 +08:00
|
|
|
table, err := db.GetTable(tableName)
|
2026-04-09 09:52:31 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to get table: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create secondary index on id
|
|
|
|
|
_, err = table.CreateIndex(
|
2026-05-19 17:34:59 +08:00
|
|
|
fmt.Sprintf("idx_%s_id", tableName),
|
2026-04-09 09:52:31 +08:00
|
|
|
infinity.NewIndexInfo("id", infinity.IndexTypeSecondary, nil),
|
|
|
|
|
infinity.ConflictTypeIgnore,
|
|
|
|
|
"",
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to create secondary index on id: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create secondary index on kb_id
|
|
|
|
|
_, err = table.CreateIndex(
|
2026-05-19 17:34:59 +08:00
|
|
|
fmt.Sprintf("idx_%s_kb_id", tableName),
|
2026-04-09 09:52:31 +08:00
|
|
|
infinity.NewIndexInfo("kb_id", infinity.IndexTypeSecondary, nil),
|
|
|
|
|
infinity.ConflictTypeIgnore,
|
|
|
|
|
"",
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to create secondary index on kb_id: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
2026-06-08 11:49:37 +08:00
|
|
|
// Create secondary index on meta_fields for metadata filter queries
|
|
|
|
|
_, err = table.CreateIndex(
|
|
|
|
|
fmt.Sprintf("idx_%s_meta_fields", tableName),
|
|
|
|
|
infinity.NewIndexInfo("meta_fields", infinity.IndexTypeSecondary, nil),
|
|
|
|
|
infinity.ConflictTypeIgnore,
|
|
|
|
|
"",
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to create secondary index on meta_fields: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-09 09:52:31 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// InsertMetadata inserts document metadata into tenant's metadata table
|
|
|
|
|
// Auto-create the table if it doesn't exist
|
|
|
|
|
// Replace existing metadata with same id and kb_id
|
|
|
|
|
func (e *infinityEngine) InsertMetadata(ctx context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error) {
|
2026-05-19 17:34:59 +08:00
|
|
|
tableName := buildMetadataTableName(tenantID)
|
2026-05-06 10:41:58 +08:00
|
|
|
common.Info("InfinityConnection.InsertMetadata called", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata)))
|
2026-04-09 09:52:31 +08:00
|
|
|
|
|
|
|
|
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Failed to get database: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
table, err := db.GetTable(tableName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// Table doesn't exist, try to create it
|
|
|
|
|
errMsg := strings.ToLower(err.Error())
|
|
|
|
|
if !strings.Contains(errMsg, "not found") && !strings.Contains(errMsg, "doesn't exist") {
|
|
|
|
|
return nil, fmt.Errorf("Failed to get table %s: %w", tableName, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create metadata table
|
2026-05-19 17:34:59 +08:00
|
|
|
if createErr := e.CreateMetadataStore(ctx, tenantID); createErr != nil {
|
2026-04-09 09:52:31 +08:00
|
|
|
return nil, fmt.Errorf("Failed to create metadata table: %w", createErr)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
table, err = db.GetTable(tableName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Failed to get table after creation: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Transform metadata - convert meta_fields map to JSON string
|
|
|
|
|
insertMetadata := make([]map[string]interface{}, len(metadata))
|
|
|
|
|
for i, m := range metadata {
|
|
|
|
|
d := make(map[string]interface{})
|
|
|
|
|
for k, v := range m {
|
|
|
|
|
if k == "meta_fields" {
|
|
|
|
|
d["meta_fields"] = utility.ConvertMapToJSONString(v)
|
|
|
|
|
} else {
|
|
|
|
|
d[k] = v
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
insertMetadata[i] = d
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Delete existing metadata with same id and kb_id, then insert new
|
|
|
|
|
if len(insertMetadata) > 0 {
|
|
|
|
|
idList := make([]string, len(insertMetadata))
|
|
|
|
|
for i, m := range insertMetadata {
|
|
|
|
|
// Escape single quotes in values to prevent SQL injection
|
|
|
|
|
docID := fmt.Sprintf("'%s'", strings.ReplaceAll(fmt.Sprintf("%v", m["id"]), "'", "''"))
|
|
|
|
|
kbID := fmt.Sprintf("'%s'", strings.ReplaceAll(fmt.Sprintf("%v", m["kb_id"]), "'", "''"))
|
|
|
|
|
idList[i] = fmt.Sprintf("(id = %s AND kb_id = %s)", docID, kbID)
|
|
|
|
|
}
|
|
|
|
|
filter := strings.Join(idList, " OR ")
|
2026-05-06 10:41:58 +08:00
|
|
|
common.Debug(fmt.Sprintf("Deleting existing metadata with filter: %s", filter))
|
2026-04-09 09:52:31 +08:00
|
|
|
delResp, delErr := table.Delete(filter)
|
|
|
|
|
if delErr != nil {
|
2026-05-06 10:41:58 +08:00
|
|
|
common.Warn(fmt.Sprintf("Failed to delete existing metadata: %v", delErr))
|
2026-04-09 09:52:31 +08:00
|
|
|
} else if delResp.DeletedRows > 0 {
|
2026-05-06 10:41:58 +08:00
|
|
|
common.Info(fmt.Sprintf("Deleted %d existing metadata entries", delResp.DeletedRows))
|
2026-04-09 09:52:31 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Insert metadata
|
|
|
|
|
_, err = table.Insert(insertMetadata)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Failed to insert metadata: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-06 10:41:58 +08:00
|
|
|
common.Info("InfinityConnection.InsertMetadata result", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata)))
|
2026-04-09 09:52:31 +08:00
|
|
|
return []string{}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// UpdateMetadata updates or inserts document metadata in tenant's metadata table.
|
2026-06-08 11:49:37 +08:00
|
|
|
//
|
|
|
|
|
// "Updates" here means MERGE, not replace. The supplied metaFields are
|
|
|
|
|
// overlaid on top of the row's existing meta_fields map: keys already
|
|
|
|
|
// present are overwritten with the new value, keys not in the input
|
|
|
|
|
// are preserved, and brand-new keys are added. If no row exists for
|
|
|
|
|
// (docID, datasetID), one is inserted containing exactly metaFields.
|
|
|
|
|
//
|
|
|
|
|
// Examples (existing row → input → resulting meta_fields):
|
|
|
|
|
//
|
|
|
|
|
// {character:["曹操","孙权"], year:2025}
|
|
|
|
|
// + {author:["John","Tom"], category:"tech"}
|
|
|
|
|
// = {character:["曹操","孙权"], year:2025, author:["John","Tom"], category:"tech"}
|
|
|
|
|
//
|
|
|
|
|
// {character:["曹操","孙权"], year:2025}
|
|
|
|
|
// + {year:2025}
|
|
|
|
|
// = {character:["曹操","孙权"], year:2025} // year value unchanged, character preserved
|
|
|
|
|
//
|
|
|
|
|
// (empty / row absent) + {author:"Tom"} = {author:"Tom"}
|
|
|
|
|
//
|
|
|
|
|
// Note: this is at odds with the SET-METADATA CLI's name, which a
|
|
|
|
|
// reader naturally parses as "replace". The merge semantics exist so
|
|
|
|
|
// that user-driven metadata edits compose with auto-extracted fields
|
|
|
|
|
// produced by the LLM extraction pipeline. See the CLI parser in
|
2026-06-29 19:09:32 +08:00
|
|
|
// internal/cli/user_parser.go (parseDevSetMeta) for the user-facing
|
2026-06-08 11:49:37 +08:00
|
|
|
// surface that drives this engine method.
|
2026-05-19 17:34:59 +08:00
|
|
|
func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, datasetID string, metaFields map[string]interface{}, tenantID string) error {
|
|
|
|
|
tableName := buildMetadataTableName(tenantID)
|
|
|
|
|
common.Info("InfinityConnection.UpdateMetadata called", zap.String("tableName", tableName), zap.String("docID", docID), zap.String("datasetID", datasetID))
|
2026-04-09 09:52:31 +08:00
|
|
|
|
|
|
|
|
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to get database: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
table, err := db.GetTable(tableName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to get metadata table %s: %w", tableName, err)
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-19 17:34:59 +08:00
|
|
|
// Build filter to find existing row by docID and datasetID
|
2026-04-09 09:52:31 +08:00
|
|
|
escapedDocID := strings.ReplaceAll(docID, "'", "''")
|
2026-05-19 17:34:59 +08:00
|
|
|
escapedDatasetID := strings.ReplaceAll(datasetID, "'", "''")
|
|
|
|
|
filter := fmt.Sprintf("id = '%s' AND kb_id = '%s'", escapedDocID, escapedDatasetID)
|
2026-04-09 09:52:31 +08:00
|
|
|
|
|
|
|
|
// Query existing metadata using the chainable API
|
|
|
|
|
queryTable := table.Output([]string{"id", "kb_id", "meta_fields"}).Filter(filter).Limit(1).Offset(0)
|
|
|
|
|
|
|
|
|
|
// Execute query to check if row exists
|
|
|
|
|
result, err := queryTable.ToResult()
|
|
|
|
|
rowExists := false
|
|
|
|
|
if err != nil {
|
2026-05-06 10:41:58 +08:00
|
|
|
common.Warn(fmt.Sprintf("Failed to query existing metadata: %v", err))
|
2026-04-09 09:52:31 +08:00
|
|
|
// If query fails, treat as not exists and insert
|
|
|
|
|
} else {
|
|
|
|
|
// Get results - ToResult returns *infinity.QueryResult
|
|
|
|
|
qr, ok := result.(*infinity.QueryResult)
|
|
|
|
|
// Check if id column has any rows - len(qr.Data["id"]) > 0 means there are rows
|
|
|
|
|
if ok && qr != nil && len(qr.Data["id"]) > 0 {
|
|
|
|
|
rowExists = true
|
|
|
|
|
// Get meta_fields from the first row
|
|
|
|
|
if metaFieldsData, exists := qr.Data["meta_fields"]; exists && len(metaFieldsData) > 0 {
|
|
|
|
|
existingMetaFieldsVal := metaFieldsData[0]
|
|
|
|
|
|
2026-05-25 19:15:07 +08:00
|
|
|
// Parse existing meta_fields if it's a string or []uint8
|
2026-04-09 09:52:31 +08:00
|
|
|
var existingMetaFields map[string]interface{}
|
|
|
|
|
if existingMetaFieldsVal != nil {
|
|
|
|
|
switch v := existingMetaFieldsVal.(type) {
|
|
|
|
|
case string:
|
|
|
|
|
if err := json.Unmarshal([]byte(v), &existingMetaFields); err != nil {
|
2026-05-06 10:41:58 +08:00
|
|
|
common.Warn(fmt.Sprintf("Failed to parse existing meta_fields: %v", err))
|
2026-04-09 09:52:31 +08:00
|
|
|
existingMetaFields = make(map[string]interface{})
|
|
|
|
|
}
|
2026-05-25 19:15:07 +08:00
|
|
|
case []uint8:
|
|
|
|
|
// Handle raw bytes from Infinity - Infinity prefixes JSON with 4 bytes (likely length), skip them
|
|
|
|
|
decoded := v
|
|
|
|
|
if len(decoded) > 4 {
|
|
|
|
|
decoded = decoded[4:]
|
|
|
|
|
}
|
|
|
|
|
if err := json.Unmarshal(decoded, &existingMetaFields); err != nil {
|
|
|
|
|
common.Warn(fmt.Sprintf("Failed to parse existing meta_fields from []uint8: %v", err))
|
|
|
|
|
existingMetaFields = make(map[string]interface{})
|
|
|
|
|
}
|
2026-04-09 09:52:31 +08:00
|
|
|
case map[string]interface{}:
|
|
|
|
|
existingMetaFields = v
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Merge new meta_fields with existing (new values override existing)
|
|
|
|
|
if existingMetaFields == nil {
|
|
|
|
|
existingMetaFields = make(map[string]interface{})
|
|
|
|
|
}
|
|
|
|
|
for k, v := range metaFields {
|
|
|
|
|
existingMetaFields[k] = v
|
|
|
|
|
}
|
|
|
|
|
metaFields = existingMetaFields
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Prepare updated metadata as JSON string
|
|
|
|
|
updatedFields := map[string]interface{}{
|
|
|
|
|
"meta_fields": utility.ConvertMapToJSONString(metaFields),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if rowExists {
|
|
|
|
|
// Row exists: update it with merged metadata
|
2026-05-06 10:41:58 +08:00
|
|
|
common.Info(fmt.Sprintf("UpdateMetadata: updating existing row, table=%s, filter=%s, newValue=%v", tableName, filter, updatedFields))
|
2026-04-09 09:52:31 +08:00
|
|
|
_, err = table.Update(filter, updatedFields)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to update metadata: %w", err)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// Row doesn't exist: insert new row
|
|
|
|
|
insertFields := map[string]interface{}{
|
|
|
|
|
"id": docID,
|
2026-05-19 17:34:59 +08:00
|
|
|
"kb_id": datasetID,
|
2026-04-09 09:52:31 +08:00
|
|
|
"meta_fields": utility.ConvertMapToJSONString(metaFields),
|
|
|
|
|
}
|
2026-05-06 10:41:58 +08:00
|
|
|
common.Info(fmt.Sprintf("UpdateMetadata: inserting new row, table=%s, newValue=%v", tableName, insertFields))
|
2026-04-09 09:52:31 +08:00
|
|
|
_, err = table.Insert(insertFields)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to insert metadata: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-06 10:41:58 +08:00
|
|
|
common.Info("InfinityConnection.UpdateMetadata completes", zap.String("tableName", tableName), zap.String("docID", docID))
|
2026-04-09 09:52:31 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
2026-05-19 17:34:59 +08:00
|
|
|
|
|
|
|
|
// DeleteMetadata deletes metadata from tenant's metadata table by condition
|
2026-05-25 19:15:07 +08:00
|
|
|
// Returns the number of deleted documents.
|
2026-05-19 17:34:59 +08:00
|
|
|
func (e *infinityEngine) DeleteMetadata(ctx context.Context, condition map[string]interface{}, tenantID string) (int64, error) {
|
|
|
|
|
tableName := buildMetadataTableName(tenantID)
|
|
|
|
|
|
|
|
|
|
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, fmt.Errorf("failed to get database: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
table, err := db.GetTable(tableName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
common.Warn(fmt.Sprintf("Metadata table %s does not exist, skipping delete", tableName))
|
|
|
|
|
return 0, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get table columns for building filter
|
|
|
|
|
clmns := make(map[string]struct {
|
|
|
|
|
Type string
|
|
|
|
|
Default interface{}
|
|
|
|
|
})
|
|
|
|
|
colsResp, err := table.ShowColumns()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, fmt.Errorf("failed to get columns: %w", err)
|
|
|
|
|
}
|
|
|
|
|
result, ok := colsResp.(*infinity.QueryResult)
|
|
|
|
|
if ok {
|
|
|
|
|
if nameArr, ok := result.Data["name"]; ok {
|
|
|
|
|
if typeArr, ok := result.Data["type"]; ok {
|
|
|
|
|
if defArr, ok := result.Data["default"]; ok {
|
|
|
|
|
for i := 0; i < len(nameArr); i++ {
|
|
|
|
|
colName, _ := nameArr[i].(string)
|
|
|
|
|
colType, _ := typeArr[i].(string)
|
|
|
|
|
var colDefault interface{}
|
|
|
|
|
if i < len(defArr) {
|
|
|
|
|
colDefault = defArr[i]
|
|
|
|
|
}
|
|
|
|
|
clmns[colName] = struct {
|
|
|
|
|
Type string
|
|
|
|
|
Default interface{}
|
|
|
|
|
}{colType, colDefault}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build filter from condition
|
|
|
|
|
filter := buildFilterFromCondition(condition, clmns)
|
|
|
|
|
|
|
|
|
|
delResp, err := table.Delete(filter)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, fmt.Errorf("failed to delete metadata: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return delResp.DeletedRows, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-25 19:15:07 +08:00
|
|
|
// DeleteMetadataKeys deletes specific metadata keys from a document's meta_fields.
|
|
|
|
|
// If deleting those keys leaves no metadata entries, the metadata row is removed.
|
|
|
|
|
func (e *infinityEngine) DeleteMetadataKeys(ctx context.Context, docID string, datasetID string, keys []string, tenantID string) error {
|
|
|
|
|
tableName := buildMetadataTableName(tenantID)
|
|
|
|
|
common.Info("InfinityConnection.DeleteMetadataKeys called", zap.String("tableName", tableName), zap.String("docID", docID), zap.Any("keys", keys))
|
|
|
|
|
|
|
|
|
|
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to get database: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
table, err := db.GetTable(tableName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to get metadata table %s: %w", tableName, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build filter to find the document
|
|
|
|
|
escapedDocID := strings.ReplaceAll(docID, "'", "''")
|
|
|
|
|
escapedDatasetID := strings.ReplaceAll(datasetID, "'", "''")
|
|
|
|
|
filter := fmt.Sprintf("id = '%s' AND kb_id = '%s'", escapedDocID, escapedDatasetID)
|
|
|
|
|
|
|
|
|
|
// Query existing metadata to get current meta_fields
|
|
|
|
|
queryTable := table.Output([]string{"id", "kb_id", "meta_fields"}).Filter(filter).Limit(1).Offset(0)
|
|
|
|
|
result, err := queryTable.ToResult()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to query existing metadata: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qr, ok := result.(*infinity.QueryResult)
|
|
|
|
|
if !ok || qr == nil || len(qr.Data["id"]) == 0 {
|
|
|
|
|
return fmt.Errorf("document not found: %s", docID)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get existing meta_fields
|
|
|
|
|
var existingMetaFields map[string]interface{}
|
|
|
|
|
if metaFieldsData, exists := qr.Data["meta_fields"]; exists && len(metaFieldsData) > 0 {
|
|
|
|
|
if metaFieldsData[0] != nil {
|
|
|
|
|
switch v := metaFieldsData[0].(type) {
|
|
|
|
|
case string:
|
|
|
|
|
if err := json.Unmarshal([]byte(v), &existingMetaFields); err != nil {
|
|
|
|
|
common.Warn("Failed to parse meta_fields JSON", zap.String("docID", docID), zap.Error(err))
|
|
|
|
|
existingMetaFields = make(map[string]interface{})
|
|
|
|
|
}
|
|
|
|
|
case []uint8:
|
|
|
|
|
// Handle raw bytes from Infinity - Infinity prefixes JSON with 4 bytes (likely length), skip them
|
|
|
|
|
decoded := v
|
|
|
|
|
if len(decoded) > 4 {
|
|
|
|
|
decoded = decoded[4:]
|
|
|
|
|
}
|
|
|
|
|
if err := json.Unmarshal(decoded, &existingMetaFields); err != nil {
|
|
|
|
|
common.Warn("Failed to parse meta_fields JSON from []uint8", zap.String("docID", docID), zap.String("err", err.Error()))
|
|
|
|
|
existingMetaFields = make(map[string]interface{})
|
|
|
|
|
}
|
|
|
|
|
case map[string]interface{}:
|
|
|
|
|
existingMetaFields = v
|
|
|
|
|
default:
|
|
|
|
|
common.Debug("meta_fields unexpected type", zap.String("type", fmt.Sprintf("%T", metaFieldsData[0])), zap.Any("value", metaFieldsData[0]))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
common.Debug("meta_fields not found in qr.Data or empty", zap.Any("exists", exists))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if existingMetaFields == nil {
|
|
|
|
|
existingMetaFields = make(map[string]interface{})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build set of keys to remove
|
|
|
|
|
keysToRemove := make(map[string]bool)
|
|
|
|
|
for _, k := range keys {
|
|
|
|
|
keysToRemove[k] = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check if any keys actually exist and would be removed
|
|
|
|
|
hasKeysToRemove := false
|
|
|
|
|
for k := range existingMetaFields {
|
|
|
|
|
if keysToRemove[k] {
|
|
|
|
|
hasKeysToRemove = true
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !hasKeysToRemove {
|
|
|
|
|
common.Info(
|
|
|
|
|
"No matching keys to delete from document",
|
|
|
|
|
zap.String("docID", docID),
|
|
|
|
|
zap.Int("existingMetaFieldCount", len(existingMetaFields)),
|
|
|
|
|
zap.Int("keysCount", len(keys)),
|
|
|
|
|
)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Count remaining keys after deletion (keys that are NOT being removed)
|
|
|
|
|
remainingKeys := 0
|
|
|
|
|
for k := range existingMetaFields {
|
|
|
|
|
if !keysToRemove[k] {
|
|
|
|
|
remainingKeys++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If no other keys would remain after deletion, delete the document directly
|
|
|
|
|
if remainingKeys == 0 {
|
|
|
|
|
common.Info("All metadata keys would be deleted, removing document from index", zap.String("docID", docID))
|
|
|
|
|
|
|
|
|
|
// Use existing DeleteMetadata method which handles the deletion properly
|
|
|
|
|
condition := map[string]interface{}{
|
|
|
|
|
"id": docID,
|
|
|
|
|
"kb_id": datasetID,
|
|
|
|
|
}
|
|
|
|
|
_, err := e.DeleteMetadata(ctx, condition, tenantID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to delete document: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
common.Info("Successfully removed document with empty meta_fields", zap.String("docID", docID))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Some keys will remain, so remove only the specified keys
|
|
|
|
|
for _, key := range keys {
|
|
|
|
|
delete(existingMetaFields, key)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update with the modified metadata
|
|
|
|
|
updatedFields := map[string]interface{}{
|
|
|
|
|
"meta_fields": utility.ConvertMapToJSONString(existingMetaFields),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_, err = table.Update(filter, updatedFields)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to delete metadata keys: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
common.Info("InfinityConnection.DeleteMetadataKeys completed", zap.String("tableName", tableName), zap.String("docID", docID))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-19 17:34:59 +08:00
|
|
|
// DropMetadataStore drops a metadata table from Infinity
|
|
|
|
|
func (e *infinityEngine) DropMetadataStore(ctx context.Context, tenantID string) error {
|
|
|
|
|
tableName := buildMetadataTableName(tenantID)
|
|
|
|
|
return e.dropTable(ctx, tableName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MetadataStoreExists checks if a metadata table exists in Infinity
|
|
|
|
|
func (e *infinityEngine) MetadataStoreExists(ctx context.Context, tenantID string) (bool, error) {
|
|
|
|
|
tableName := buildMetadataTableName(tenantID)
|
|
|
|
|
return e.tableExists(ctx, tableName)
|
2026-06-08 11:49:37 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SearchMetadata executes search specifically for metadata tables
|
|
|
|
|
// This is separate from Search() which handles only chunk tables
|
|
|
|
|
func (e *infinityEngine) SearchMetadata(ctx context.Context, req *types.SearchMetadataRequest) (*types.SearchMetadataResult, error) {
|
|
|
|
|
tenantID := req.TenantID
|
|
|
|
|
common.Debug("SearchMetadata in Infinity started", zap.String("tenantID", tenantID))
|
|
|
|
|
|
|
|
|
|
// Validate inputs
|
|
|
|
|
if tenantID == "" {
|
|
|
|
|
return nil, fmt.Errorf("tenantID cannot be empty")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build table name from tenantID
|
|
|
|
|
tableName := buildMetadataTableName(tenantID)
|
|
|
|
|
|
|
|
|
|
exists, err := e.tableExists(ctx, tableName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
common.Warn("Infinity SearchMetadata table existence check failed", zap.String("table", tableName), zap.Error(err))
|
|
|
|
|
return nil, fmt.Errorf("failed to check metadata table existence: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if !exists {
|
|
|
|
|
common.Debug("Infinity SearchMetadata table absent, returning empty result", zap.String("table", tableName))
|
|
|
|
|
// Return an empty (non-nil) slice — Python returns `[]`, and a
|
|
|
|
|
// nil slice is read by callers as "fall back to in-memory". A
|
|
|
|
|
// zero-match against an absent table is a definitive answer,
|
|
|
|
|
// not a missing-data condition.
|
|
|
|
|
return &types.SearchMetadataResult{
|
|
|
|
|
MetadataRecords: []map[string]interface{}{},
|
|
|
|
|
Total: 0,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-06-18 18:07:27 +08:00
|
|
|
// Build output columns: use caller-specified fields, or "*" for all columns
|
|
|
|
|
var outputColumns []string
|
|
|
|
|
if len(req.SelectFields) > 0 {
|
|
|
|
|
outputColumns = req.SelectFields
|
|
|
|
|
} else {
|
|
|
|
|
outputColumns = []string{"*"}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Pagination defaults
|
|
|
|
|
pageSize := req.Limit
|
|
|
|
|
if pageSize <= 0 {
|
|
|
|
|
pageSize = 30
|
|
|
|
|
}
|
|
|
|
|
offset := req.Offset
|
|
|
|
|
if offset < 0 {
|
|
|
|
|
offset = 0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build filter from req.Filter
|
|
|
|
|
var filterStr string
|
|
|
|
|
if req.Filter != nil {
|
|
|
|
|
filterStr = equivalentConditionToStr(req.Filter)
|
2026-06-08 11:49:37 +08:00
|
|
|
}
|
|
|
|
|
|
2026-06-18 18:07:27 +08:00
|
|
|
// Get database and table
|
|
|
|
|
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
2026-06-08 11:49:37 +08:00
|
|
|
if err != nil {
|
2026-06-18 18:07:27 +08:00
|
|
|
return nil, fmt.Errorf("failed to get database: %w", err)
|
2026-06-08 11:49:37 +08:00
|
|
|
}
|
2026-06-18 18:07:27 +08:00
|
|
|
|
|
|
|
|
tbl, err := db.GetTable(tableName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("failed to get metadata table %s: %w", tableName, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build Infinity query (chainable API)
|
|
|
|
|
table := tbl.Output(outputColumns)
|
|
|
|
|
if filterStr != "" {
|
|
|
|
|
table = table.Filter(filterStr)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add order_by if provided
|
|
|
|
|
if req.OrderBy != nil && len(req.OrderBy.Fields) > 0 {
|
|
|
|
|
var sortFields [][2]interface{}
|
|
|
|
|
for _, orderField := range req.OrderBy.Fields {
|
|
|
|
|
sortType := infinity.SortTypeAsc
|
|
|
|
|
if orderField.Type == types.SortDesc {
|
|
|
|
|
sortType = infinity.SortTypeDesc
|
|
|
|
|
}
|
|
|
|
|
sortFields = append(sortFields, [2]interface{}{orderField.Field, sortType})
|
|
|
|
|
}
|
|
|
|
|
table = table.Sort(sortFields)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
table = table.Limit(pageSize)
|
|
|
|
|
if offset > 0 {
|
|
|
|
|
table = table.Offset(offset)
|
|
|
|
|
}
|
|
|
|
|
table = table.Option(map[string]interface{}{"total_hits_count": true})
|
|
|
|
|
|
|
|
|
|
// Execute query
|
|
|
|
|
df, err := table.ToDataFrame()
|
|
|
|
|
if err != nil {
|
|
|
|
|
common.Warn("Infinity SearchMetadata query failed",
|
|
|
|
|
zap.String("tableName", tableName),
|
|
|
|
|
zap.Error(err))
|
|
|
|
|
return nil, fmt.Errorf("metadata query failed: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Convert column-oriented DataFrame to row-oriented records
|
|
|
|
|
records := make([]map[string]interface{}, 0)
|
|
|
|
|
for colName, colData := range df.ColumnData {
|
|
|
|
|
for i, val := range colData {
|
|
|
|
|
for len(records) <= i {
|
|
|
|
|
records = append(records, make(map[string]interface{}))
|
|
|
|
|
}
|
|
|
|
|
records[i][colName] = val
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Handle ROW_ID -> row_id() mapping (Infinity internal column)
|
|
|
|
|
for _, rec := range records {
|
|
|
|
|
if val, ok := rec["ROW_ID"]; ok {
|
|
|
|
|
rec["row_id()"] = val
|
|
|
|
|
delete(rec, "ROW_ID")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Realign meta_fields column for multi-row queries (Infinity may
|
|
|
|
|
// concatenate values into one blob with 4-byte length prefix)
|
|
|
|
|
realignMetaFieldsColumn(records)
|
|
|
|
|
|
|
|
|
|
// Parse total_hits_count from ExtraInfo
|
|
|
|
|
var totalHits int64
|
|
|
|
|
if df.ExtraInfo != "" {
|
|
|
|
|
if t, ok := totalHitsFromInfinityExtraInfo(df.ExtraInfo); ok {
|
|
|
|
|
totalHits = t
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
common.Debug("SearchMetadata in Infinity completed",
|
|
|
|
|
zap.Int("rows", len(records)),
|
|
|
|
|
zap.Int64("total", totalHits))
|
|
|
|
|
|
2026-06-08 11:49:37 +08:00
|
|
|
return &types.SearchMetadataResult{
|
2026-06-18 18:07:27 +08:00
|
|
|
MetadataRecords: records,
|
|
|
|
|
Total: totalHits,
|
2026-06-08 11:49:37 +08:00
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// parseLengthPrefixedJSON parses Infinity's length-prefixed JSON format
|
|
|
|
|
// (a sequence of [4-byte little-endian length][JSON] records) and returns
|
|
|
|
|
// each parsed JSON object. This is the same on-the-wire format that the
|
|
|
|
|
// service layer's ParseAllLengthPrefixedJSON understands; duplicated here
|
|
|
|
|
// to keep the engine package free of service-layer dependencies.
|
|
|
|
|
//
|
|
|
|
|
// The format is what Infinity's SDK returns for VARCHAR/TEXT columns
|
|
|
|
|
// when a query matches multiple rows: instead of giving us a list of
|
|
|
|
|
// per-row byte arrays, it concatenates all rows' values into a single
|
|
|
|
|
// blob, prefixing each with a 4-byte little-endian length.
|
|
|
|
|
//
|
|
|
|
|
// Returns nil if `data` is too short to be valid, or if no JSON
|
|
|
|
|
// objects could be extracted.
|
|
|
|
|
func parseLengthPrefixedJSON(data []byte) []map[string]interface{} {
|
|
|
|
|
if len(data) < 4 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
var results []map[string]interface{}
|
|
|
|
|
offset := 0
|
|
|
|
|
for offset+4 <= len(data) {
|
|
|
|
|
// Read 4-byte length (little-endian)
|
|
|
|
|
length := uint32(data[offset]) |
|
|
|
|
|
uint32(data[offset+1])<<8 |
|
|
|
|
|
uint32(data[offset+2])<<16 |
|
|
|
|
|
uint32(data[offset+3])<<24
|
|
|
|
|
if length == 0 || offset+4+int(length) > len(data) {
|
|
|
|
|
// Length invalid; bail out.
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
jsonStart := offset + 4
|
|
|
|
|
jsonEnd := jsonStart + int(length)
|
|
|
|
|
var result map[string]interface{}
|
|
|
|
|
if err := json.Unmarshal(data[jsonStart:jsonEnd], &result); err == nil {
|
|
|
|
|
results = append(results, result)
|
|
|
|
|
offset = jsonEnd
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
return results
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// realignMetaFieldsColumn fixes a column-oriented data-frame
|
|
|
|
|
// misalignment that happens when Infinity's SDK returns the
|
|
|
|
|
// `meta_fields` column for a multi-row query as a single
|
|
|
|
|
// length-prefixed byte array instead of one entry per row. After the
|
|
|
|
|
// column→row loop has run, the first matching chunk holds the entire
|
|
|
|
|
// concatenated blob and the rest are missing the field. This function
|
|
|
|
|
// splits the blob into per-row JSON objects and reattaches them in
|
|
|
|
|
// order to the chunks that need them.
|
|
|
|
|
//
|
|
|
|
|
// Safe no-op when:
|
|
|
|
|
// - there are no chunks
|
|
|
|
|
// - the `meta_fields` column is already aligned (one byte array per
|
|
|
|
|
// chunk), so a length-prefixed parse of any single value yields
|
|
|
|
|
// exactly one object
|
|
|
|
|
// - the byte array doesn't parse as length-prefixed JSON
|
|
|
|
|
func realignMetaFieldsColumn(chunks []map[string]interface{}) {
|
|
|
|
|
if len(chunks) < 2 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
firstVal, ok := chunks[0]["meta_fields"]
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
firstBytes, ok := firstVal.([]byte)
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
parsed := parseLengthPrefixedJSON(firstBytes)
|
|
|
|
|
if len(parsed) != len(chunks) {
|
|
|
|
|
// Either the blob didn't parse as length-prefixed, or it
|
|
|
|
|
// parsed to a different count than the number of chunks we
|
|
|
|
|
// built. In either case, don't risk misattributing data.
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
for i, meta := range parsed {
|
|
|
|
|
chunks[i]["meta_fields"] = meta
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// metaPushdownMaxSize caps how many doc IDs the metadata push-down is
|
|
|
|
|
// willing to return in one shot. Matches the Python reference
|
|
|
|
|
// (DocMetadataService.filter_doc_ids_by_meta_pushdown, default limit=10000)
|
|
|
|
|
// and ES's default index.max_result_window.
|
|
|
|
|
//
|
|
|
|
|
// When the underlying query matches more than this, the push-down
|
|
|
|
|
// returns nil and the caller falls back to the in-memory meta_filter,
|
|
|
|
|
// which is correct (just slower for very large result sets). Returning
|
|
|
|
|
// a truncated slice as a definitive answer would silently drop docs.
|
|
|
|
|
const metaPushdownMaxSize = 10000
|
|
|
|
|
|
|
|
|
|
// FilterDocIdsByMetaPushdown runs a metadata filter directly against the Infinity table.
|
|
|
|
|
//
|
|
|
|
|
// Return value convention (matching Python's filter_doc_ids_by_meta_pushdown):
|
|
|
|
|
//
|
|
|
|
|
// nil -> push-down was not viable / errored / result overflowed the
|
|
|
|
|
// push-down cap (caller should fall back to in-memory)
|
|
|
|
|
// []string{} -> push-down succeeded but found 0 matching docs (empty result is definitive)
|
|
|
|
|
func (e *infinityEngine) FilterDocIdsByMetaPushdown(ctx context.Context, kbIDs []string, conditions []map[string]interface{}, logic string) []string {
|
|
|
|
|
if len(conditions) == 0 || len(kbIDs) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check if push-down is supported
|
|
|
|
|
if !IsPushdownSupported(conditions) {
|
|
|
|
|
common.Debug("FilterDocIdsByMetaPushdown: push-down not supported for some filters")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get tenant ID from first KB
|
|
|
|
|
tenantID, err := dao.GetTenantIDByKBID(kbIDs[0])
|
|
|
|
|
if err != nil {
|
|
|
|
|
common.Warn("FilterDocIdsByMetaPushdown: failed to get tenant for KB", zap.String("kbID", kbIDs[0]), zap.Error(err))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tableName := buildMetadataTableName(tenantID)
|
|
|
|
|
|
|
|
|
|
// Build SQL WHERE clause using the full meta_filter logic
|
|
|
|
|
whereClause, err := BuildInfinityFilter(conditions, logic)
|
|
|
|
|
if err != nil {
|
|
|
|
|
common.Debug("FilterDocIdsByMetaPushdown: build filter failed", zap.String("error", err.Error()))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add KB filter using IN clause. Escape any single quotes in the IDs
|
|
|
|
|
// defensively — KB IDs are normally UUIDs, but malformed input must
|
|
|
|
|
// not be able to break out of the literal and alter the query.
|
|
|
|
|
quotedKBIDs := make([]string, len(kbIDs))
|
|
|
|
|
for i, kbID := range kbIDs {
|
|
|
|
|
quotedKBIDs[i] = "'" + strings.ReplaceAll(kbID, "'", "''") + "'"
|
|
|
|
|
}
|
|
|
|
|
kbFilter := "kb_id IN (" + strings.Join(quotedKBIDs, ", ") + ")"
|
|
|
|
|
// Wrap the translated predicate in parens so the AND with the KB clause
|
|
|
|
|
// doesn't get re-grouped by an internal OR. Without the parens,
|
|
|
|
|
// `kbFilter AND a OR b` parses as `(kbFilter AND a) OR b`, which can
|
|
|
|
|
// match rows in other KBs.
|
|
|
|
|
whereClause = kbFilter + " AND (" + whereClause + ")"
|
|
|
|
|
|
|
|
|
|
// Use Infinity connection to execute query
|
|
|
|
|
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
|
|
|
|
if err != nil || db == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
table, err := db.GetTable(tableName)
|
|
|
|
|
if err != nil || table == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Execute query using chainable API: Output(...).Filter(...)
|
|
|
|
|
// .Limit(metaPushdownMaxSize) caps the page size, and
|
|
|
|
|
// .Option({total_hits_count: true}) makes the exact match count
|
|
|
|
|
// available in QueryResult.ExtraInfo so we can detect overflow and
|
|
|
|
|
// fall back to the in-memory meta_filter rather than silently
|
|
|
|
|
// returning a truncated slice (which the caller treats as definitive).
|
|
|
|
|
common.Debug("FilterDocIdsByMetaPushdown executing Infinity query", zap.String("whereClause", whereClause))
|
|
|
|
|
queryTable := table.Output([]string{"id"}).Filter(whereClause)
|
|
|
|
|
queryTable = queryTable.Limit(metaPushdownMaxSize)
|
|
|
|
|
queryTable = queryTable.Option(map[string]interface{}{"total_hits_count": true})
|
|
|
|
|
result, err := queryTable.ToResult()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qr, ok := result.(*infinity.QueryResult)
|
|
|
|
|
if !ok || qr == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Detect overflow via the SDK's ExtraInfo payload (a JSON string set
|
|
|
|
|
// when total_hits_count is requested). If we can't parse it, log
|
|
|
|
|
// and fall through — the in-memory path is still correct, just
|
|
|
|
|
// slower.
|
|
|
|
|
if total, parsed := totalHitsFromInfinityExtraInfo(qr.ExtraInfo); parsed {
|
|
|
|
|
if total > int64(metaPushdownMaxSize) {
|
|
|
|
|
common.Warn("FilterDocIdsByMetaPushdown: result exceeds push-down cap, falling back to in-memory",
|
|
|
|
|
zap.Int64("total", total),
|
|
|
|
|
zap.Int("cap", metaPushdownMaxSize),
|
|
|
|
|
zap.Strings("kbIDs", kbIDs),
|
|
|
|
|
)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
} else if qr.ExtraInfo != "" {
|
|
|
|
|
// ExtraInfo was non-empty but didn't carry total_hits_count in the
|
|
|
|
|
// expected shape — unusual, but worth flagging so we don't quietly
|
|
|
|
|
// lose the overflow signal if Infinity changes its payload.
|
|
|
|
|
common.Debug("FilterDocIdsByMetaPushdown: Infinity ExtraInfo present but total_hits_count missing",
|
|
|
|
|
zap.String("extraInfo", qr.ExtraInfo),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Extract doc IDs from the result.
|
|
|
|
|
docIDs := make([]string, 0)
|
|
|
|
|
if idData, exists := qr.Data["id"]; exists {
|
|
|
|
|
for _, id := range idData {
|
|
|
|
|
if idStr, ok := id.(string); ok {
|
|
|
|
|
docIDs = append(docIDs, idStr)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
common.Debug("FilterDocIdsByMetaPushdown returned doc IDs", zap.Int("count", len(docIDs)))
|
|
|
|
|
return docIDs
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// totalHitsFromInfinityExtraInfo parses the JSON blob Infinity returns
|
|
|
|
|
// in QueryResult.ExtraInfo when the total_hits_count option is set. The
|
|
|
|
|
// shape is not part of the public SDK contract today (it's a string
|
|
|
|
|
// field with an undocumented layout), so we accept several common
|
|
|
|
|
// spellings and stay tolerant of future changes.
|
|
|
|
|
//
|
|
|
|
|
// Returns (total, true) when a non-negative integer is found, otherwise
|
|
|
|
|
// (0, false) so the caller can decide how to react.
|
|
|
|
|
func totalHitsFromInfinityExtraInfo(extraInfo string) (int64, bool) {
|
|
|
|
|
if extraInfo == "" {
|
|
|
|
|
return 0, false
|
|
|
|
|
}
|
|
|
|
|
// Try a permissive decode first — Infinity has historically
|
|
|
|
|
// returned things like {"total_hits_count": 42} but we don't want
|
|
|
|
|
// to bind to that exact shape forever.
|
|
|
|
|
var generic map[string]interface{}
|
|
|
|
|
if err := json.Unmarshal([]byte(extraInfo), &generic); err != nil {
|
|
|
|
|
return 0, false
|
|
|
|
|
}
|
|
|
|
|
for _, key := range []string{"total_hits_count", "totalHitsCount", "total"} {
|
|
|
|
|
raw, ok := generic[key]
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
switch v := raw.(type) {
|
|
|
|
|
case float64:
|
|
|
|
|
if v < 0 {
|
|
|
|
|
return 0, false
|
|
|
|
|
}
|
|
|
|
|
return int64(v), true
|
|
|
|
|
case int64:
|
|
|
|
|
if v < 0 {
|
|
|
|
|
return 0, false
|
|
|
|
|
}
|
|
|
|
|
return v, true
|
|
|
|
|
case int:
|
|
|
|
|
if v < 0 {
|
|
|
|
|
return 0, false
|
|
|
|
|
}
|
|
|
|
|
return int64(v), true
|
|
|
|
|
case json.Number:
|
|
|
|
|
n, err := v.Int64()
|
|
|
|
|
if err == nil && n >= 0 {
|
|
|
|
|
return n, true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return 0, false
|
|
|
|
|
}
|