mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-03 17:21:59 +08:00
Refact functions in engine in GO (#14981)
### What problem does this PR solve? Refact functions in engine in GO ### Type of change - [x] Refactoring
This commit is contained in:
1216
internal/engine/elasticsearch/chunk.go
Normal file
1216
internal/engine/elasticsearch/chunk.go
Normal file
File diff suppressed because it is too large
Load Diff
98
internal/engine/elasticsearch/common.go
Normal file
98
internal/engine/elasticsearch/common.go
Normal file
@@ -0,0 +1,98 @@
|
||||
//
|
||||
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||||
)
|
||||
|
||||
// dropIndex deletes an index
|
||||
func (e *elasticsearchEngine) dropIndex(ctx context.Context, indexName string) error {
|
||||
if indexName == "" {
|
||||
return fmt.Errorf("index name cannot be empty")
|
||||
}
|
||||
|
||||
// Check if index exists
|
||||
exists, err := e.indexExists(ctx, indexName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check index existence: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
return fmt.Errorf("index '%s' does not exist", indexName)
|
||||
}
|
||||
|
||||
// Delete index
|
||||
req := esapi.IndicesDeleteRequest{
|
||||
Index: []string{indexName},
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, e.client)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete index: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.IsError() {
|
||||
bodyBytes, _ := io.ReadAll(res.Body)
|
||||
reason := extractErrorReason(bodyBytes)
|
||||
if reason != "" {
|
||||
return fmt.Errorf("elasticsearch error: %s", reason)
|
||||
}
|
||||
return fmt.Errorf("elasticsearch returned error: %s", res.Status())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// indexExists checks if index exists
|
||||
func (e *elasticsearchEngine) indexExists(ctx context.Context, indexName string) (bool, error) {
|
||||
if indexName == "" {
|
||||
return false, fmt.Errorf("index name cannot be empty")
|
||||
}
|
||||
|
||||
req := esapi.IndicesExistsRequest{
|
||||
Index: []string{indexName},
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, e.client)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to check index existence: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.StatusCode == 200 {
|
||||
return true, nil
|
||||
} else if res.StatusCode == 404 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
bodyBytes, _ := io.ReadAll(res.Body)
|
||||
reason := extractErrorReason(bodyBytes)
|
||||
if reason != "" {
|
||||
return false, fmt.Errorf("elasticsearch error: %s", reason)
|
||||
}
|
||||
return false, fmt.Errorf("elasticsearch returned error: %s", res.Status())
|
||||
}
|
||||
|
||||
// buildMetadataIndexName returns the metadata index name for a tenant
|
||||
func buildMetadataIndexName(tenantID string) string {
|
||||
return fmt.Sprintf("ragflow_doc_meta_%s", tenantID)
|
||||
}
|
||||
@@ -1,49 +0,0 @@
|
||||
//
|
||||
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"ragflow/internal/engine/types"
|
||||
)
|
||||
|
||||
// GetChunk gets a chunk by ID
|
||||
func (e *elasticsearchEngine) GetChunk(ctx context.Context, indexName, chunkID string, kbIDs []string) (interface{}, error) {
|
||||
// Build unified search request to get the chunk by ID
|
||||
searchReq := &types.SearchRequest{
|
||||
IndexNames: []string{indexName},
|
||||
Limit: 1,
|
||||
Offset: 0,
|
||||
Filter: map[string]interface{}{
|
||||
"id": chunkID,
|
||||
},
|
||||
}
|
||||
|
||||
// Execute search
|
||||
searchResp, err := e.Search(ctx, searchReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to search: %w", err)
|
||||
}
|
||||
|
||||
if len(searchResp.Chunks) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return searchResp.Chunks[0], nil
|
||||
}
|
||||
@@ -1,362 +0,0 @@
|
||||
//
|
||||
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||||
)
|
||||
|
||||
// CreateDataset creates an index
|
||||
func (e *elasticsearchEngine) CreateDataset(ctx context.Context, indexName, datasetID string, vectorSize int, parserID string) error {
|
||||
if indexName == "" {
|
||||
return fmt.Errorf("index name cannot be empty")
|
||||
}
|
||||
|
||||
// Check if index already exists
|
||||
exists, err := e.TableExists(ctx, indexName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check index existence: %w", err)
|
||||
}
|
||||
if exists {
|
||||
return fmt.Errorf("index '%s' already exists", indexName)
|
||||
}
|
||||
|
||||
// Load mapping based on index type
|
||||
var mapping map[string]interface{}
|
||||
if datasetID == "skill" {
|
||||
// Load skill-specific mapping
|
||||
skillMapping, err := loadSkillMapping()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load skill mapping: %w", err)
|
||||
}
|
||||
mapping = skillMapping
|
||||
} else {
|
||||
// Default mapping for dataset
|
||||
mapping = map[string]interface{}{
|
||||
"settings": map[string]interface{}{
|
||||
"number_of_shards": 1,
|
||||
"number_of_replicas": 0,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare request body
|
||||
var body io.Reader
|
||||
if mapping != nil {
|
||||
data, err := json.Marshal(mapping)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal mapping: %w", err)
|
||||
}
|
||||
body = bytes.NewReader(data)
|
||||
}
|
||||
|
||||
// Create index
|
||||
req := esapi.IndicesCreateRequest{
|
||||
Index: indexName,
|
||||
Body: body,
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, e.client)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create index: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.IsError() {
|
||||
bodyBytes, _ := io.ReadAll(res.Body)
|
||||
reason := extractErrorReason(bodyBytes)
|
||||
if reason != "" {
|
||||
return fmt.Errorf("elasticsearch error: %s", reason)
|
||||
}
|
||||
return fmt.Errorf("elasticsearch returned error: %s, body: %s", res.Status(), string(bodyBytes))
|
||||
}
|
||||
|
||||
// Parse response
|
||||
var result map[string]interface{}
|
||||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||||
return fmt.Errorf("failed to parse response: %w", err)
|
||||
}
|
||||
|
||||
acknowledged, ok := result["acknowledged"].(bool)
|
||||
if !ok || !acknowledged {
|
||||
return fmt.Errorf("index creation not acknowledged")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadSkillMapping loads the skill index mapping from config file
|
||||
func loadSkillMapping() (map[string]interface{}, error) {
|
||||
// Try multiple possible locations for the mapping file
|
||||
possiblePaths := []string{
|
||||
"conf/skill_es_mapping.json",
|
||||
"../conf/skill_es_mapping.json",
|
||||
"/app/conf/skill_es_mapping.json",
|
||||
}
|
||||
|
||||
var data []byte
|
||||
var err error
|
||||
for _, path := range possiblePaths {
|
||||
data, err = os.ReadFile(path)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// Fallback to default skill mapping if file not found
|
||||
return getDefaultSkillMapping(), nil
|
||||
}
|
||||
|
||||
var mapping map[string]interface{}
|
||||
if err := json.Unmarshal(data, &mapping); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse skill mapping: %w", err)
|
||||
}
|
||||
|
||||
return mapping, nil
|
||||
}
|
||||
|
||||
// getDefaultSkillMapping returns the default skill index mapping
|
||||
func getDefaultSkillMapping() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"settings": map[string]interface{}{
|
||||
"index": map[string]interface{}{
|
||||
"number_of_shards": 1,
|
||||
"number_of_replicas": 0,
|
||||
"refresh_interval": "1000ms",
|
||||
},
|
||||
},
|
||||
"mappings": map[string]interface{}{
|
||||
"dynamic": false,
|
||||
"properties": map[string]interface{}{
|
||||
"skill_id": map[string]interface{}{
|
||||
"type": "keyword",
|
||||
"store": true,
|
||||
},
|
||||
"name": map[string]interface{}{
|
||||
"type": "text",
|
||||
"index": false,
|
||||
"store": true,
|
||||
},
|
||||
"name_tks": map[string]interface{}{
|
||||
"type": "text",
|
||||
"analyzer": "whitespace",
|
||||
"store": true,
|
||||
},
|
||||
"tags": map[string]interface{}{
|
||||
"type": "text",
|
||||
"index": false,
|
||||
"store": true,
|
||||
},
|
||||
"tags_tks": map[string]interface{}{
|
||||
"type": "text",
|
||||
"analyzer": "whitespace",
|
||||
"store": true,
|
||||
},
|
||||
"description": map[string]interface{}{
|
||||
"type": "text",
|
||||
"index": false,
|
||||
"store": true,
|
||||
},
|
||||
"description_tks": map[string]interface{}{
|
||||
"type": "text",
|
||||
"analyzer": "whitespace",
|
||||
"store": true,
|
||||
},
|
||||
"content": map[string]interface{}{
|
||||
"type": "text",
|
||||
"index": false,
|
||||
"store": true,
|
||||
},
|
||||
"content_tks": map[string]interface{}{
|
||||
"type": "text",
|
||||
"analyzer": "whitespace",
|
||||
"store": true,
|
||||
},
|
||||
"q_3072_vec": map[string]interface{}{
|
||||
"type": "dense_vector",
|
||||
"dims": 3072,
|
||||
"index": true,
|
||||
"similarity": "cosine",
|
||||
},
|
||||
"q_2560_vec": map[string]interface{}{
|
||||
"type": "dense_vector",
|
||||
"dims": 2560,
|
||||
"index": true,
|
||||
"similarity": "cosine",
|
||||
},
|
||||
"q_1536_vec": map[string]interface{}{
|
||||
"type": "dense_vector",
|
||||
"dims": 1536,
|
||||
"index": true,
|
||||
"similarity": "cosine",
|
||||
},
|
||||
"q_1024_vec": map[string]interface{}{
|
||||
"type": "dense_vector",
|
||||
"dims": 1024,
|
||||
"index": true,
|
||||
"similarity": "cosine",
|
||||
},
|
||||
"q_768_vec": map[string]interface{}{
|
||||
"type": "dense_vector",
|
||||
"dims": 768,
|
||||
"index": true,
|
||||
"similarity": "cosine",
|
||||
},
|
||||
"q_512_vec": map[string]interface{}{
|
||||
"type": "dense_vector",
|
||||
"dims": 512,
|
||||
"index": true,
|
||||
"similarity": "cosine",
|
||||
},
|
||||
"q_256_vec": map[string]interface{}{
|
||||
"type": "dense_vector",
|
||||
"dims": 256,
|
||||
"index": true,
|
||||
"similarity": "cosine",
|
||||
},
|
||||
"version": map[string]interface{}{
|
||||
"type": "keyword",
|
||||
"store": true,
|
||||
},
|
||||
"status": map[string]interface{}{
|
||||
"type": "keyword",
|
||||
"store": true,
|
||||
},
|
||||
"create_time": map[string]interface{}{
|
||||
"type": "long",
|
||||
"store": true,
|
||||
},
|
||||
"update_time": map[string]interface{}{
|
||||
"type": "long",
|
||||
"store": true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// DropTable deletes an index
|
||||
func (e *elasticsearchEngine) DropTable(ctx context.Context, indexName string) error {
|
||||
if indexName == "" {
|
||||
return fmt.Errorf("index name cannot be empty")
|
||||
}
|
||||
|
||||
// Check if index exists
|
||||
exists, err := e.TableExists(ctx, indexName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check index existence: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
return fmt.Errorf("index '%s' does not exist", indexName)
|
||||
}
|
||||
|
||||
// Delete index
|
||||
req := esapi.IndicesDeleteRequest{
|
||||
Index: []string{indexName},
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, e.client)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete index: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.IsError() {
|
||||
bodyBytes, _ := io.ReadAll(res.Body)
|
||||
reason := extractErrorReason(bodyBytes)
|
||||
if reason != "" {
|
||||
return fmt.Errorf("elasticsearch error: %s", reason)
|
||||
}
|
||||
return fmt.Errorf("elasticsearch returned error: %s", res.Status())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TableExists checks if index exists
|
||||
func (e *elasticsearchEngine) TableExists(ctx context.Context, indexName string) (bool, error) {
|
||||
if indexName == "" {
|
||||
return false, fmt.Errorf("index name cannot be empty")
|
||||
}
|
||||
|
||||
req := esapi.IndicesExistsRequest{
|
||||
Index: []string{indexName},
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, e.client)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to check index existence: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.StatusCode == 200 {
|
||||
return true, nil
|
||||
} else if res.StatusCode == 404 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
bodyBytes, _ := io.ReadAll(res.Body)
|
||||
reason := extractErrorReason(bodyBytes)
|
||||
if reason != "" {
|
||||
return false, fmt.Errorf("elasticsearch error: %s", reason)
|
||||
}
|
||||
return false, fmt.Errorf("elasticsearch returned error: %s", res.Status())
|
||||
}
|
||||
|
||||
// CreateMetadata creates the document metadata index
|
||||
func (e *elasticsearchEngine) CreateMetadata(ctx context.Context, indexName string) error {
|
||||
// TODO
|
||||
return nil
|
||||
}
|
||||
|
||||
// InsertDataset inserts documents into a dataset index
|
||||
func (e *elasticsearchEngine) InsertDataset(ctx context.Context, documents []map[string]interface{}, indexName string, knowledgebaseID string) ([]string, error) {
|
||||
// TODO
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
// InsertMetadata inserts documents into tenant's metadata index
|
||||
func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, documents []map[string]interface{}, tenantID string) ([]string, error) {
|
||||
// TODO
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
// UpdateDataset updates a chunk by condition
|
||||
func (e *elasticsearchEngine) UpdateDataset(ctx context.Context, condition map[string]interface{}, newValue map[string]interface{}, tableNamePrefix string, knowledgebaseID string) error {
|
||||
// TODO
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateMetadata updates document metadata in tenant's metadata index
|
||||
func (e *elasticsearchEngine) UpdateMetadata(ctx context.Context, docID string, kbID string, metaFields map[string]interface{}, tenantID string) error {
|
||||
// TODO
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes rows from either a dataset index or metadata index
|
||||
func (e *elasticsearchEngine) Delete(ctx context.Context, condition map[string]interface{}, indexName string, datasetID string) (int64, error) {
|
||||
// TODO
|
||||
return 0, nil
|
||||
}
|
||||
275
internal/engine/elasticsearch/metadata.go
Normal file
275
internal/engine/elasticsearch/metadata.go
Normal file
@@ -0,0 +1,275 @@
|
||||
//
|
||||
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||||
"ragflow/internal/common"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// CreateMetadataStore creates the document metadata index
|
||||
func (e *elasticsearchEngine) CreateMetadataStore(ctx context.Context, tenantID string) error {
|
||||
indexName := buildMetadataIndexName(tenantID)
|
||||
req := esapi.IndicesCreateRequest{
|
||||
Index: indexName,
|
||||
}
|
||||
res, err := req.Do(ctx, e.client)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create metadata index: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.IsError() {
|
||||
return fmt.Errorf("elasticsearch returned error: %s", res.Status())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// InsertMetadata inserts documents into tenant's metadata index
|
||||
func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error) {
|
||||
indexName := buildMetadataIndexName(tenantID)
|
||||
common.Info("Inserting metadata into Elasticsearch index", zap.String("index_name", indexName), zap.String("tenant_id", tenantID), zap.Int("doc_count", len(metadata)))
|
||||
|
||||
if len(metadata) == 0 {
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
if indexName == "" {
|
||||
return nil, fmt.Errorf("index name cannot be empty")
|
||||
}
|
||||
|
||||
// Check if index exists, create if not
|
||||
exists, err := e.indexExists(ctx, indexName)
|
||||
if err != nil {
|
||||
common.Error("Failed to check index existence", err)
|
||||
return nil, fmt.Errorf("failed to check index existence: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
// Create metadata index
|
||||
if createErr := e.CreateMetadataStore(ctx, tenantID); createErr != nil {
|
||||
return nil, fmt.Errorf("failed to create metadata index: %w", createErr)
|
||||
}
|
||||
}
|
||||
|
||||
// Build bulk request body
|
||||
var buf bytes.Buffer
|
||||
for _, doc := range metadata {
|
||||
// Action line - index operation
|
||||
action := map[string]interface{}{
|
||||
"index": map[string]interface{}{
|
||||
"_index": indexName,
|
||||
},
|
||||
}
|
||||
actionBytes, err := json.Marshal(action)
|
||||
if err != nil {
|
||||
common.Error("Failed to marshal bulk action", err)
|
||||
return nil, fmt.Errorf("failed to marshal bulk action: %w", err)
|
||||
}
|
||||
buf.Write(actionBytes)
|
||||
buf.WriteByte('\n')
|
||||
|
||||
// Document line - meta_fields is stored as-is (ES can handle nested objects)
|
||||
docBytes, err := json.Marshal(doc)
|
||||
if err != nil {
|
||||
common.Error("Failed to marshal document", err)
|
||||
return nil, fmt.Errorf("failed to marshal document: %w", err)
|
||||
}
|
||||
buf.Write(docBytes)
|
||||
buf.WriteByte('\n')
|
||||
}
|
||||
|
||||
// Execute bulk request
|
||||
req := esapi.BulkRequest{
|
||||
Body: bytes.NewReader(buf.Bytes()),
|
||||
Refresh: "false",
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, e.client)
|
||||
if err != nil {
|
||||
common.Error("Failed to execute bulk request", err)
|
||||
return nil, fmt.Errorf("failed to execute bulk request: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.IsError() {
|
||||
common.Sugar.Errorw("Elasticsearch bulk request returned error", "status", res.Status())
|
||||
return nil, fmt.Errorf("elasticsearch bulk request returned error: %s", res.Status())
|
||||
}
|
||||
|
||||
// Parse bulk response to check for errors
|
||||
var bulkResponse map[string]interface{}
|
||||
if err := json.NewDecoder(res.Body).Decode(&bulkResponse); err != nil {
|
||||
common.Error("Failed to parse bulk response", err)
|
||||
return nil, fmt.Errorf("failed to parse bulk response: %w", err)
|
||||
}
|
||||
|
||||
// Check for errors in bulk response
|
||||
if errors, ok := bulkResponse["errors"].(bool); ok && errors {
|
||||
common.Warn("Bulk request had some errors")
|
||||
}
|
||||
|
||||
common.Info("Successfully inserted metadata into Elasticsearch index", zap.String("index_name", indexName), zap.Int("doc_count", len(metadata)))
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
// UpdateMetadata updates document metadata in tenant's metadata index
|
||||
func (e *elasticsearchEngine) UpdateMetadata(ctx context.Context, docID string, datasetID string, metaFields map[string]interface{}, tenantID string) error {
|
||||
indexName := buildMetadataIndexName(tenantID)
|
||||
common.Info("Updating metadata in Elasticsearch index", zap.String("index_name", indexName), zap.String("docID", docID), zap.String("datasetID", datasetID))
|
||||
|
||||
// Check if index exists
|
||||
exists, err := e.indexExists(ctx, indexName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check index existence: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
return fmt.Errorf("index '%s' does not exist", indexName)
|
||||
}
|
||||
|
||||
// Build the document ID for update
|
||||
docID = strings.ReplaceAll(docID, "'", "''")
|
||||
datasetIDStr := strings.ReplaceAll(datasetID, "'", "''")
|
||||
|
||||
// Build update body - merge meta_fields with existing
|
||||
query := map[string]interface{}{
|
||||
"bool": map[string]interface{}{
|
||||
"must": []map[string]interface{}{
|
||||
{"term": map[string]interface{}{"id": docID}},
|
||||
{"term": map[string]interface{}{"kb_id": datasetIDStr}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
updateReq := map[string]interface{}{
|
||||
"query": query,
|
||||
"script": map[string]interface{}{
|
||||
"source": "ctx._source.meta_fields = params.meta_fields",
|
||||
"params": map[string]interface{}{
|
||||
"meta_fields": metaFields,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
updateBytes, err := json.Marshal(updateReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal update request: %w", err)
|
||||
}
|
||||
|
||||
req := esapi.UpdateByQueryRequest{
|
||||
Index: []string{indexName},
|
||||
Body: bytes.NewReader(updateBytes),
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, e.client)
|
||||
if err != nil {
|
||||
common.Error("Failed to execute update by query", err)
|
||||
return fmt.Errorf("failed to execute update by query: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.IsError() {
|
||||
common.Sugar.Errorw("Elasticsearch update by query returned error", "status", res.Status())
|
||||
return fmt.Errorf("elasticsearch update by query returned error: %s", res.Status())
|
||||
}
|
||||
|
||||
common.Info("Successfully updated metadata in Elasticsearch index", zap.String("index_name", indexName), zap.String("docID", docID))
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteMetadata deletes metadata from tenant's metadata index by condition
|
||||
func (e *elasticsearchEngine) DeleteMetadata(ctx context.Context, condition map[string]interface{}, tenantID string) (int64, error) {
|
||||
indexName := buildMetadataIndexName(tenantID)
|
||||
common.Info("Deleting metadata from Elasticsearch index", zap.String("index_name", indexName), zap.Any("condition", condition))
|
||||
|
||||
// Check if index exists
|
||||
exists, err := e.indexExists(ctx, indexName)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to check index existence: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
common.Warn(fmt.Sprintf("Index %s does not exist, skipping delete", indexName))
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Build query from condition
|
||||
query := e.buildMetadataQueryFromCondition(condition)
|
||||
if query == nil {
|
||||
query = map[string]interface{}{"match_all": map[string]interface{}{}}
|
||||
}
|
||||
|
||||
// Build delete by query body
|
||||
deleteBody := map[string]interface{}{
|
||||
"query": query,
|
||||
}
|
||||
|
||||
bodyBytes, err := json.Marshal(deleteBody)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to marshal delete body: %w", err)
|
||||
}
|
||||
|
||||
// Execute delete by query
|
||||
req := esapi.DeleteByQueryRequest{
|
||||
Index: []string{indexName},
|
||||
Body: bytes.NewReader(bodyBytes),
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, e.client)
|
||||
if err != nil {
|
||||
common.Error("Failed to execute delete by query", err)
|
||||
return 0, fmt.Errorf("failed to execute delete by query: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.IsError() {
|
||||
common.Sugar.Errorw("Elasticsearch delete by query returned error", "status", res.Status())
|
||||
return 0, fmt.Errorf("elasticsearch delete by query returned error: %s", res.Status())
|
||||
}
|
||||
|
||||
// Parse response
|
||||
var result map[string]interface{}
|
||||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||||
common.Error("Failed to parse delete response", err)
|
||||
return 0, fmt.Errorf("failed to parse delete response: %w", err)
|
||||
}
|
||||
|
||||
deleted := int64(0)
|
||||
if d, ok := result["deleted"].(float64); ok {
|
||||
deleted = int64(d)
|
||||
}
|
||||
|
||||
common.Info("Successfully deleted metadata", zap.String("index_name", indexName), zap.Int64("deleted_count", deleted))
|
||||
return deleted, nil
|
||||
}
|
||||
|
||||
// DropMetadataStore drops a metadata index from Elasticsearch
|
||||
func (e *elasticsearchEngine) DropMetadataStore(ctx context.Context, tenantID string) error {
|
||||
indexName := buildMetadataIndexName(tenantID)
|
||||
return e.dropIndex(ctx, indexName)
|
||||
}
|
||||
|
||||
// MetadataStoreExists checks if a metadata index exists in Elasticsearch
|
||||
func (e *elasticsearchEngine) MetadataStoreExists(ctx context.Context, tenantID string) (bool, error) {
|
||||
indexName := buildMetadataIndexName(tenantID)
|
||||
return e.indexExists(ctx, indexName)
|
||||
}
|
||||
@@ -1,583 +0,0 @@
|
||||
//
|
||||
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"ragflow/internal/common"
|
||||
"strings"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"ragflow/internal/engine/types"
|
||||
)
|
||||
|
||||
// SearchResponse Elasticsearch search response
|
||||
type SearchResponse struct {
|
||||
Hits struct {
|
||||
Total struct {
|
||||
Value int64 `json:"value"`
|
||||
} `json:"total"`
|
||||
Hits []struct {
|
||||
ID string `json:"_id"`
|
||||
Score float64 `json:"_score"`
|
||||
Source map[string]interface{} `json:"_source"`
|
||||
} `json:"hits"`
|
||||
} `json:"hits"`
|
||||
Aggregations map[string]interface{} `json:"aggregations"`
|
||||
}
|
||||
|
||||
// Search executes search with unified types.SearchRequest
|
||||
func (e *elasticsearchEngine) Search(ctx context.Context, req *types.SearchRequest) (*types.SearchResult, error) {
|
||||
return e.searchUnified(ctx, req)
|
||||
}
|
||||
|
||||
// searchUnified handles the unified types.SearchRequest
|
||||
func (e *elasticsearchEngine) searchUnified(ctx context.Context, req *types.SearchRequest) (*types.SearchResult, error) {
|
||||
if len(req.IndexNames) == 0 {
|
||||
return nil, fmt.Errorf("index names cannot be empty")
|
||||
}
|
||||
|
||||
// Build pagination parameters
|
||||
offset := req.Offset
|
||||
limit := req.Limit
|
||||
if limit <= 0 {
|
||||
limit = 30 // default ES size
|
||||
}
|
||||
|
||||
// Check if this is a skill index
|
||||
isSkillIndex := len(req.IndexNames) > 0 && strings.HasPrefix(req.IndexNames[0], "skill_")
|
||||
|
||||
// Build filter clauses
|
||||
var filterClauses []map[string]interface{}
|
||||
if isSkillIndex {
|
||||
filterClauses = buildSkillFilterClauses()
|
||||
} else {
|
||||
filterClauses = buildFilterClauses(req.KbIDs, 1)
|
||||
}
|
||||
|
||||
// Add filters from req.Filter
|
||||
if req.Filter != nil && len(req.Filter) > 0 {
|
||||
filterClauses = append(filterClauses, buildFilterFromMap(req.Filter)...)
|
||||
}
|
||||
|
||||
// Build search query body
|
||||
queryBody := make(map[string]interface{})
|
||||
|
||||
// Determine search type from MatchExprs
|
||||
var matchText string
|
||||
var matchDense *types.MatchDenseExpr
|
||||
var hasVectorMatch bool
|
||||
|
||||
for _, expr := range req.MatchExprs {
|
||||
if expr == nil {
|
||||
continue
|
||||
}
|
||||
switch e := expr.(type) {
|
||||
case string:
|
||||
matchText = e
|
||||
case *types.MatchTextExpr:
|
||||
matchText = e.MatchingText
|
||||
case *types.MatchDenseExpr:
|
||||
hasVectorMatch = true
|
||||
matchDense = e
|
||||
}
|
||||
}
|
||||
|
||||
var vectorFieldName string
|
||||
if !hasVectorMatch || matchDense == nil {
|
||||
// Keyword-only search
|
||||
if isSkillIndex {
|
||||
queryBody["query"] = buildSkillKeywordQuery(matchText, filterClauses, 1.0)
|
||||
} else {
|
||||
queryBody["query"] = buildESKeywordQuery(matchText, filterClauses, 1.0)
|
||||
}
|
||||
} else {
|
||||
// Hybrid search: keyword + vector
|
||||
textWeight := 0.7 // default: vector weight = 0.3
|
||||
vectorWeight := 0.3
|
||||
if matchDense.ExtraOptions != nil {
|
||||
if vw, ok := matchDense.ExtraOptions["text_weight"].(float64); ok {
|
||||
textWeight = vw
|
||||
}
|
||||
if vw, ok := matchDense.ExtraOptions["vector_weight"].(float64); ok {
|
||||
vectorWeight = vw
|
||||
}
|
||||
}
|
||||
|
||||
// Build boolean query for text match and filters
|
||||
var boolQuery map[string]interface{}
|
||||
if isSkillIndex {
|
||||
boolQuery = buildSkillKeywordQuery(matchText, filterClauses, 1.0)
|
||||
} else {
|
||||
boolQuery = buildESKeywordQuery(matchText, filterClauses, 1.0)
|
||||
}
|
||||
// Add boost to the bool query (as in Python code)
|
||||
if boolMap, ok := boolQuery["bool"].(map[string]interface{}); ok {
|
||||
boolMap["boost"] = textWeight
|
||||
}
|
||||
|
||||
// Build kNN query
|
||||
vectorData := matchDense.EmbeddingData
|
||||
vectorFieldName = matchDense.VectorColumnName
|
||||
k := matchDense.TopN
|
||||
if k <= 0 {
|
||||
k = req.Limit
|
||||
}
|
||||
if k <= 0 {
|
||||
k = 1024
|
||||
}
|
||||
numCandidates := k * 2
|
||||
|
||||
similarity := 0.0
|
||||
if matchDense.ExtraOptions != nil {
|
||||
if sim, ok := matchDense.ExtraOptions["similarity"].(float64); ok {
|
||||
similarity = sim
|
||||
}
|
||||
}
|
||||
|
||||
knnQuery := map[string]interface{}{
|
||||
"field": vectorFieldName,
|
||||
"query_vector": vectorData,
|
||||
"k": k,
|
||||
"num_candidates": numCandidates,
|
||||
"similarity": similarity,
|
||||
"boost": vectorWeight,
|
||||
}
|
||||
|
||||
queryBody["knn"] = knnQuery
|
||||
queryBody["query"] = boolQuery
|
||||
|
||||
// Add vector column to Source fields (matching Python ES: src.append(f"q_{len(q_vec)}_vec"))
|
||||
// Only modify Source if it was explicitly set by the caller
|
||||
if vectorFieldName != "" && len(req.SelectFields) > 0 {
|
||||
sourceFields := req.SelectFields
|
||||
found := false
|
||||
for _, f := range sourceFields {
|
||||
if f == vectorFieldName {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
sourceFields = append(sourceFields, vectorFieldName)
|
||||
}
|
||||
req.SelectFields = sourceFields
|
||||
}
|
||||
}
|
||||
|
||||
queryBody["size"] = limit
|
||||
queryBody["from"] = offset
|
||||
|
||||
// Add sorting if specified
|
||||
if req.OrderBy != nil {
|
||||
sort := parseOrderByExpr(req.OrderBy)
|
||||
if len(sort) > 0 {
|
||||
queryBody["sort"] = sort
|
||||
}
|
||||
}
|
||||
|
||||
// Serialize query
|
||||
var buf bytes.Buffer
|
||||
if err := json.NewEncoder(&buf).Encode(queryBody); err != nil {
|
||||
return nil, fmt.Errorf("error encoding query: %w", err)
|
||||
}
|
||||
|
||||
// Log search details
|
||||
common.Debug("Elasticsearch searching indices", zap.Strings("indices", req.IndexNames))
|
||||
common.Debug("Elasticsearch DSL", zap.Any("dsl", queryBody))
|
||||
|
||||
// Build search request
|
||||
reqES := esapi.SearchRequest{
|
||||
Index: req.IndexNames,
|
||||
Body: &buf,
|
||||
}
|
||||
|
||||
// Execute search
|
||||
res, err := reqES.Do(ctx, e.client)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("search failed: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.IsError() {
|
||||
bodyBytes, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
common.Error("Elasticsearch failed to read error response body", err)
|
||||
} else {
|
||||
common.Warn("Elasticsearch error response", zap.String("body", string(bodyBytes)))
|
||||
}
|
||||
return nil, fmt.Errorf("Elasticsearch returned error: %s", res.Status())
|
||||
}
|
||||
|
||||
// Parse response
|
||||
var esResp SearchResponse
|
||||
if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
|
||||
return nil, fmt.Errorf("error parsing response: %w", err)
|
||||
}
|
||||
|
||||
// Convert to unified response
|
||||
chunks := convertESResponse(&esResp, vectorFieldName)
|
||||
return &types.SearchResult{
|
||||
Chunks: chunks,
|
||||
Total: esResp.Hits.Total.Value,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// calculatePagination calculates offset and limit based on page, size and topK
|
||||
func calculatePagination(page, size, topK int) (int, int) {
|
||||
if page < 1 {
|
||||
page = 1
|
||||
}
|
||||
if size <= 0 {
|
||||
size = 30
|
||||
}
|
||||
if topK <= 0 {
|
||||
topK = 1024
|
||||
}
|
||||
|
||||
RERANK_LIMIT := max(30, (64/size)*size)
|
||||
if RERANK_LIMIT < size {
|
||||
RERANK_LIMIT = size
|
||||
}
|
||||
if RERANK_LIMIT > topK {
|
||||
RERANK_LIMIT = topK
|
||||
}
|
||||
|
||||
offset := (page - 1) * RERANK_LIMIT
|
||||
if offset < 0 {
|
||||
offset = 0
|
||||
}
|
||||
|
||||
return offset, RERANK_LIMIT
|
||||
}
|
||||
|
||||
// buildFilterClauses builds ES filter clauses from kb_ids and available_int
|
||||
// Reference: rag/utils/es_conn.py L60-L78
|
||||
// When available=0: available_int < 1
|
||||
// When available!=0: NOT (available_int < 1)
|
||||
func buildFilterClauses(kbIDs []string, available int) []map[string]interface{} {
|
||||
var filters []map[string]interface{}
|
||||
|
||||
if len(kbIDs) > 0 {
|
||||
filters = append(filters, map[string]interface{}{
|
||||
"terms": map[string]interface{}{"kb_id": kbIDs},
|
||||
})
|
||||
}
|
||||
|
||||
// Add available_int filter
|
||||
// Reference: rag/utils/es_conn.py L63-L68
|
||||
if available == 0 {
|
||||
// available_int < 1
|
||||
filters = append(filters, map[string]interface{}{
|
||||
"range": map[string]interface{}{
|
||||
"available_int": map[string]interface{}{
|
||||
"lt": 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
} else {
|
||||
// must_not: available_int < 1 (i.e., available_int >= 1)
|
||||
filters = append(filters, map[string]interface{}{
|
||||
"bool": map[string]interface{}{
|
||||
"must_not": []map[string]interface{}{
|
||||
{
|
||||
"range": map[string]interface{}{
|
||||
"available_int": map[string]interface{}{
|
||||
"lt": 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return filters
|
||||
}
|
||||
|
||||
// buildSkillFilterClauses builds ES filter clauses for skill index
|
||||
// Skill index uses 'status' field instead of 'available_int'
|
||||
func buildSkillFilterClauses() []map[string]interface{} {
|
||||
// Filter for active skills (status = "1")
|
||||
return []map[string]interface{}{
|
||||
{
|
||||
"term": map[string]interface{}{
|
||||
"status": "1",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// buildFilterFromMap converts a generic filter map to ES filter clauses
|
||||
func buildFilterFromMap(filter map[string]interface{}) []map[string]interface{} {
|
||||
var filters []map[string]interface{}
|
||||
for field, value := range filter {
|
||||
switch v := value.(type) {
|
||||
case []string:
|
||||
filters = append(filters, map[string]interface{}{
|
||||
"terms": map[string]interface{}{field: v},
|
||||
})
|
||||
case []interface{}:
|
||||
filters = append(filters, map[string]interface{}{
|
||||
"terms": map[string]interface{}{field: v},
|
||||
})
|
||||
default:
|
||||
filters = append(filters, map[string]interface{}{
|
||||
"term": map[string]interface{}{field: v},
|
||||
})
|
||||
}
|
||||
}
|
||||
return filters
|
||||
}
|
||||
|
||||
// buildESKeywordQuery builds keyword-only search query for ES
|
||||
// Uses query_string if matchText is in query_string format, otherwise uses multi_match
|
||||
// boost is applied to the text match clause (query_string or multi_match)
|
||||
func buildESKeywordQuery(matchText string, filterClauses []map[string]interface{}, boost float64) map[string]interface{} {
|
||||
var mustClause map[string]interface{}
|
||||
|
||||
// Handle wildcard query (match all)
|
||||
if matchText == "*" || matchText == "" {
|
||||
mustClause = map[string]interface{}{
|
||||
"match_all": map[string]interface{}{},
|
||||
}
|
||||
} else {
|
||||
// Use query_string for complex queries
|
||||
queryString := map[string]interface{}{
|
||||
"query": matchText,
|
||||
"fields": []string{"title_tks^10", "title_sm_tks^5", "important_kwd^30", "important_tks^20", "question_tks^20", "content_ltks^2", "content_sm_ltks"},
|
||||
"type": "best_fields",
|
||||
"minimum_should_match": "30%",
|
||||
"boost": boost,
|
||||
}
|
||||
mustClause = map[string]interface{}{
|
||||
"query_string": queryString,
|
||||
}
|
||||
}
|
||||
|
||||
return map[string]interface{}{
|
||||
"bool": map[string]interface{}{
|
||||
"must": mustClause,
|
||||
"filter": filterClauses,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// buildSkillKeywordQuery builds keyword-only search query for skill index
|
||||
// Skill index uses different field names: name_tks, tags_tks, description_tks, content_tks
|
||||
func buildSkillKeywordQuery(matchText string, filterClauses []map[string]interface{}, boost float64) map[string]interface{} {
|
||||
var mustClause map[string]interface{}
|
||||
|
||||
// Handle wildcard query (match all)
|
||||
if matchText == "*" || matchText == "" {
|
||||
mustClause = map[string]interface{}{
|
||||
"match_all": map[string]interface{}{},
|
||||
}
|
||||
} else {
|
||||
// Use query_string for complex queries with skill-specific fields
|
||||
queryString := map[string]interface{}{
|
||||
"query": matchText,
|
||||
"fields": []string{"name_tks^10", "tags_tks^5", "description_tks^3", "content_tks^1"},
|
||||
"type": "best_fields",
|
||||
"minimum_should_match": "30%",
|
||||
"boost": boost,
|
||||
}
|
||||
mustClause = map[string]interface{}{
|
||||
"query_string": queryString,
|
||||
}
|
||||
}
|
||||
|
||||
return map[string]interface{}{
|
||||
"bool": map[string]interface{}{
|
||||
"must": mustClause,
|
||||
"filter": filterClauses,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// convertESResponse converts ES SearchResponse to unified chunks format
|
||||
func convertESResponse(esResp *SearchResponse, vectorFieldName string) []map[string]interface{} {
|
||||
if esResp == nil || esResp.Hits.Hits == nil {
|
||||
return []map[string]interface{}{}
|
||||
}
|
||||
|
||||
chunks := make([]map[string]interface{}, len(esResp.Hits.Hits))
|
||||
for i, hit := range esResp.Hits.Hits {
|
||||
chunks[i] = hit.Source
|
||||
chunks[i]["_score"] = hit.Score
|
||||
chunks[i]["_id"] = hit.ID
|
||||
}
|
||||
return chunks
|
||||
}
|
||||
|
||||
// parseOrderByExpr parses the OrderBy expression into ES sort format
|
||||
func parseOrderByExpr(orderBy *types.OrderByExpr) []map[string]interface{} {
|
||||
if orderBy == nil || len(orderBy.Fields) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var result []map[string]interface{}
|
||||
for _, field := range orderBy.Fields {
|
||||
direction := "asc"
|
||||
if field.Type == types.SortDesc {
|
||||
direction = "desc"
|
||||
}
|
||||
|
||||
if field.Field == "_score" || field.Field == "score" {
|
||||
result = append(result, map[string]interface{}{
|
||||
"_score": direction,
|
||||
})
|
||||
} else {
|
||||
result = append(result, map[string]interface{}{
|
||||
field.Field: direction,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// Helper query builder functions (legacy)
|
||||
|
||||
// BuildMatchTextQuery builds a text match query
|
||||
func BuildMatchTextQuery(fields []string, text string, fuzziness string) map[string]interface{} {
|
||||
query := map[string]interface{}{
|
||||
"multi_match": map[string]interface{}{
|
||||
"query": text,
|
||||
"fields": fields,
|
||||
},
|
||||
}
|
||||
|
||||
if fuzziness != "" {
|
||||
if multiMatch, ok := query["multi_match"].(map[string]interface{}); ok {
|
||||
multiMatch["fuzziness"] = fuzziness
|
||||
}
|
||||
}
|
||||
|
||||
return query
|
||||
}
|
||||
|
||||
// BuildTermQuery builds a term query
|
||||
func BuildTermQuery(field string, value interface{}) map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"term": map[string]interface{}{
|
||||
field: value,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// BuildRangeQuery builds a range query
|
||||
func BuildRangeQuery(field string, from, to interface{}) map[string]interface{} {
|
||||
rangeQuery := make(map[string]interface{})
|
||||
if from != nil {
|
||||
rangeQuery["gte"] = from
|
||||
}
|
||||
if to != nil {
|
||||
rangeQuery["lte"] = to
|
||||
}
|
||||
|
||||
return map[string]interface{}{
|
||||
"range": map[string]interface{}{
|
||||
field: rangeQuery,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// BuildBoolQuery builds a bool query
|
||||
func BuildBoolQuery() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"bool": make(map[string]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
// AddMust adds must clause to bool query
|
||||
func AddMust(query map[string]interface{}, clauses ...map[string]interface{}) {
|
||||
if boolQuery, ok := query["bool"].(map[string]interface{}); ok {
|
||||
if _, exists := boolQuery["must"]; !exists {
|
||||
boolQuery["must"] = []map[string]interface{}{}
|
||||
}
|
||||
if must, ok := boolQuery["must"].([]map[string]interface{}); ok {
|
||||
boolQuery["must"] = append(must, clauses...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AddShould adds should clause to bool query
|
||||
func AddShould(query map[string]interface{}, clauses ...map[string]interface{}) {
|
||||
if boolQuery, ok := query["bool"].(map[string]interface{}); ok {
|
||||
if _, exists := boolQuery["should"]; !exists {
|
||||
boolQuery["should"] = []map[string]interface{}{}
|
||||
}
|
||||
if should, ok := boolQuery["should"].([]map[string]interface{}); ok {
|
||||
boolQuery["should"] = append(should, clauses...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AddFilter adds filter clause to bool query
|
||||
func AddFilter(query map[string]interface{}, clauses ...map[string]interface{}) {
|
||||
if boolQuery, ok := query["bool"].(map[string]interface{}); ok {
|
||||
if _, exists := boolQuery["filter"]; !exists {
|
||||
boolQuery["filter"] = []map[string]interface{}{}
|
||||
}
|
||||
if filter, ok := boolQuery["filter"].([]map[string]interface{}); ok {
|
||||
boolQuery["filter"] = append(filter, clauses...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AddMustNot adds must_not clause to bool query
|
||||
func AddMustNot(query map[string]interface{}, clauses ...map[string]interface{}) {
|
||||
if boolQuery, ok := query["bool"].(map[string]interface{}); ok {
|
||||
if _, exists := boolQuery["must_not"]; !exists {
|
||||
boolQuery["must_not"] = []map[string]interface{}{}
|
||||
}
|
||||
if mustNot, ok := boolQuery["must_not"].([]map[string]interface{}); ok {
|
||||
boolQuery["must_not"] = append(mustNot, clauses...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetFields is not implemented for Elasticsearch
|
||||
func (e *elasticsearchEngine) GetFields(chunks []map[string]interface{}, fields []string) map[string]map[string]interface{} {
|
||||
common.Warn("GetFields not implemented for Elasticsearch")
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAggregation is not implemented for Elasticsearch
|
||||
func (e *elasticsearchEngine) GetAggregation(chunks []map[string]interface{}, fieldName string) []map[string]interface{} {
|
||||
common.Warn("GetAggregation not implemented for Elasticsearch")
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetHighlight is not implemented for Elasticsearch
|
||||
func (e *elasticsearchEngine) GetHighlight(chunks []map[string]interface{}, keywords []string, fieldName string) map[string]string {
|
||||
common.Warn("GetHighlight not implemented for Elasticsearch")
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDocIDs is not implemented for Elasticsearch
|
||||
func (e *elasticsearchEngine) GetDocIDs(chunks []map[string]interface{}) []string {
|
||||
common.Warn("GetDocIDs not implemented for Elasticsearch")
|
||||
return nil
|
||||
}
|
||||
@@ -32,26 +32,23 @@ const (
|
||||
|
||||
// DocEngine document storage engine interface
|
||||
type DocEngine interface {
|
||||
// Search
|
||||
Search(ctx context.Context, req *types.SearchRequest) (*types.SearchResult, error)
|
||||
|
||||
// Dataset operations
|
||||
CreateDataset(ctx context.Context, indexName, datasetID string, vectorSize int, parserID string) error
|
||||
InsertDataset(ctx context.Context, documents []map[string]interface{}, indexName string, knowledgebaseID string) ([]string, error)
|
||||
UpdateDataset(ctx context.Context, condition map[string]interface{}, newValue map[string]interface{}, tableNamePrefix string, knowledgebaseID string) error
|
||||
|
||||
// Chunk operations
|
||||
GetChunk(ctx context.Context, indexName, chunkID string, kbIDs []string) (interface{}, error)
|
||||
CreateChunkStore(ctx context.Context, baseName, datasetID string, vectorSize int, parserID string) error
|
||||
InsertChunks(ctx context.Context, chunks []map[string]interface{}, baseName string, datasetID string) ([]string, error)
|
||||
UpdateChunks(ctx context.Context, condition map[string]interface{}, newValue map[string]interface{}, baseName string, datasetID string) error
|
||||
DeleteChunks(ctx context.Context, condition map[string]interface{}, baseName string, datasetID string) (int64, error)
|
||||
Search(ctx context.Context, req *types.SearchRequest) (*types.SearchResult, error)
|
||||
GetChunk(ctx context.Context, baseName, chunkID string, datasetIDs []string) (interface{}, error)
|
||||
DropChunkStore(ctx context.Context, baseName, datasetID string) error
|
||||
ChunkStoreExists(ctx context.Context, baseName, datasetID string) (bool, error)
|
||||
|
||||
// Document metadata operations
|
||||
CreateMetadata(ctx context.Context, indexName string) error
|
||||
InsertMetadata(ctx context.Context, documents []map[string]interface{}, tenantID string) ([]string, error)
|
||||
UpdateMetadata(ctx context.Context, docID string, kbID string, metaFields map[string]interface{}, tenantID string) error
|
||||
|
||||
// Operations for both dataset and metadata tables
|
||||
Delete(ctx context.Context, condition map[string]interface{}, indexName string, datasetID string) (int64, error)
|
||||
DropTable(ctx context.Context, indexName string) error
|
||||
TableExists(ctx context.Context, indexName string) (bool, error)
|
||||
CreateMetadataStore(ctx context.Context, tenantID string) error
|
||||
InsertMetadata(ctx context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error)
|
||||
UpdateMetadata(ctx context.Context, docID string, datasetID string, metaFields map[string]interface{}, tenantID string) error
|
||||
DeleteMetadata(ctx context.Context, condition map[string]interface{}, tenantID string) (int64, error)
|
||||
DropMetadataStore(ctx context.Context, tenantID string) error
|
||||
MetadataStoreExists(ctx context.Context, tenantID string) (bool, error)
|
||||
|
||||
// Document operations (used by skill indexing)
|
||||
IndexDocument(ctx context.Context, indexName, docID string, doc interface{}) error
|
||||
|
||||
2038
internal/engine/infinity/chunk.go
Normal file
2038
internal/engine/infinity/chunk.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -25,101 +25,58 @@ import (
|
||||
"strings"
|
||||
|
||||
infinity "github.com/infiniflow/infinity-go-sdk"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Delete deletes rows from either a dataset table or metadata table.
|
||||
// If indexName starts with "ragflow_doc_meta_", it's a metadata table.
|
||||
// Otherwise, it's a dataset table: {indexName}_{datasetID}
|
||||
func (e *infinityEngine) Delete(ctx context.Context, condition map[string]interface{}, indexName string, datasetID string) (int64, error) {
|
||||
var tableName string
|
||||
if strings.HasPrefix(indexName, "ragflow_doc_meta_") {
|
||||
tableName = indexName
|
||||
} else {
|
||||
tableName = fmt.Sprintf("%s_%s", indexName, datasetID)
|
||||
// dropTable drops a table from Infinity
|
||||
func (e *infinityEngine) dropTable(ctx context.Context, tableName string) error {
|
||||
if tableName == "" {
|
||||
return fmt.Errorf("table name cannot be empty")
|
||||
}
|
||||
|
||||
// Check if table exists
|
||||
exists, err := e.tableExists(ctx, tableName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check table existence: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
return fmt.Errorf("table '%s' does not exist", tableName)
|
||||
}
|
||||
|
||||
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get database: %w", err)
|
||||
return fmt.Errorf("failed to get database: %w", err)
|
||||
}
|
||||
|
||||
table, err := db.GetTable(tableName)
|
||||
_, err = db.DropTable(tableName, infinity.ConflictTypeError)
|
||||
if err != nil {
|
||||
common.Warn(fmt.Sprintf("Table %s does not exist, skipping delete", tableName))
|
||||
return 0, nil
|
||||
return fmt.Errorf("failed to drop table: %w", err)
|
||||
}
|
||||
|
||||
// Get table columns for building filter
|
||||
clmns := make(map[string]struct {
|
||||
Type string
|
||||
Default interface{}
|
||||
})
|
||||
colsResp, err := table.ShowColumns()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get columns: %w", err)
|
||||
}
|
||||
result, ok := colsResp.(*infinity.QueryResult)
|
||||
if ok {
|
||||
if nameArr, ok := result.Data["name"]; ok {
|
||||
if typeArr, ok := result.Data["type"]; ok {
|
||||
if defArr, ok := result.Data["default"]; ok {
|
||||
for i := 0; i < len(nameArr); i++ {
|
||||
colName, _ := nameArr[i].(string)
|
||||
colType, _ := typeArr[i].(string)
|
||||
var colDefault interface{}
|
||||
if i < len(defArr) {
|
||||
colDefault = defArr[i]
|
||||
}
|
||||
clmns[colName] = struct {
|
||||
Type string
|
||||
Default interface{}
|
||||
}{colType, colDefault}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Build filter from condition
|
||||
filter := buildFilterFromCondition(condition, clmns)
|
||||
|
||||
delResp, err := table.Delete(filter)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to delete: %w", err)
|
||||
}
|
||||
|
||||
return delResp.DeletedRows, nil
|
||||
}
|
||||
|
||||
// DropTable deletes a table/index
|
||||
func (e *infinityEngine) DropTable(ctx context.Context, indexName string) error {
|
||||
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get database: %w", err)
|
||||
}
|
||||
|
||||
_, err = db.DropTable(indexName, infinity.ConflictTypeIgnore)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to drop table: %w", err)
|
||||
}
|
||||
common.Info("Infinity dropped table", zap.String("tableName", tableName))
|
||||
return nil
|
||||
}
|
||||
|
||||
// TableExists checks if table/index exists
|
||||
func (e *infinityEngine) TableExists(ctx context.Context, indexName string) (bool, error) {
|
||||
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Failed to get database: %w", err)
|
||||
// tableExists checks if a table exists in Infinity
|
||||
func (e *infinityEngine) tableExists(ctx context.Context, tableName string) (bool, error) {
|
||||
if tableName == "" {
|
||||
return false, fmt.Errorf("table name cannot be empty")
|
||||
}
|
||||
|
||||
_, err = db.GetTable(indexName)
|
||||
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
||||
if err != nil {
|
||||
// Check if error is "table not found"
|
||||
errLower := strings.ToLower(err.Error())
|
||||
if strings.Contains(errLower, "not found") || strings.Contains(errLower, "notexist") || strings.Contains(errLower, "doesn't exist") {
|
||||
return false, fmt.Errorf("failed to get database: %w", err)
|
||||
}
|
||||
|
||||
// Try to get the table - if it exists, no error
|
||||
_, err = db.GetTable(tableName)
|
||||
if err != nil {
|
||||
errMsg := strings.ToLower(err.Error())
|
||||
if strings.Contains(errMsg, "not found") || strings.Contains(errMsg, "doesn't exist") {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
return false, fmt.Errorf("failed to check table existence: %w", err)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
@@ -335,3 +292,18 @@ func (e *infinityEngine) columnExists(table *infinity.Table, columnName string)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// buildChunkTableName returns the chunk table name for a dataset
|
||||
// Skill Table: table name is just baseName (e.g., "skill_abc123_def456")
|
||||
// Regular chunk Table: table name is {baseName}_{datasetID}
|
||||
func buildChunkTableName(baseName, datasetID string) string {
|
||||
if datasetID == "skill" {
|
||||
return baseName
|
||||
}
|
||||
return fmt.Sprintf("%s_%s", baseName, datasetID)
|
||||
}
|
||||
|
||||
// buildMetadataTableName returns the metadata table name for a tenant
|
||||
func buildMetadataTableName(tenantID string) string {
|
||||
return fmt.Sprintf("ragflow_doc_meta_%s", tenantID)
|
||||
}
|
||||
|
||||
@@ -1,655 +0,0 @@
|
||||
//
|
||||
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
package infinity
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"ragflow/internal/common"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"ragflow/internal/utility"
|
||||
|
||||
infinity "github.com/infiniflow/infinity-go-sdk"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// CreateDataset creates a table in Infinity
|
||||
// indexName is the table name prefix (e.g., "ragflow_<tenant_id>")
|
||||
// The full table name is built as "{indexName}_{datasetID}"
|
||||
// For skill index (datasetID="skill"), tableName is just indexName and uses skill_infinity_mapping.json
|
||||
func (e *infinityEngine) CreateDataset(ctx context.Context, indexName, datasetID string, vectorSize int, parserID string) error {
|
||||
vecSize := vectorSize
|
||||
|
||||
// Determine table name and mapping file based on index type
|
||||
var tableName string
|
||||
var mappingFile string
|
||||
|
||||
if datasetID == "skill" {
|
||||
// Skill index: table name is just indexName (e.g., "skill_abc123_def456")
|
||||
tableName = indexName
|
||||
mappingFile = "skill_infinity_mapping.json"
|
||||
common.Info("Creating skill index table", zap.String("tableName", tableName), zap.String("mappingFile", mappingFile))
|
||||
} else {
|
||||
// Regular document index: table name is {indexName}_{datasetID}
|
||||
tableName = fmt.Sprintf("%s_%s", indexName, datasetID)
|
||||
mappingFile = e.mappingFileName
|
||||
common.Info("Creating regular index table", zap.String("tableName", tableName), zap.String("mappingFile", mappingFile))
|
||||
}
|
||||
|
||||
// Use configured schema
|
||||
fpMapping := filepath.Join(utility.GetProjectRoot(), "conf", mappingFile)
|
||||
|
||||
schemaData, err := os.ReadFile(fpMapping)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to read mapping file: %w", err)
|
||||
}
|
||||
|
||||
var schema orderedFields
|
||||
if err := json.Unmarshal(schemaData, &schema); err != nil {
|
||||
return fmt.Errorf("Failed to parse mapping file: %w", err)
|
||||
}
|
||||
|
||||
// Get database
|
||||
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get database: %w", err)
|
||||
}
|
||||
|
||||
// Determine vector column name
|
||||
vectorColName := fmt.Sprintf("q_%d_vec", vecSize)
|
||||
|
||||
// Check if table already exists
|
||||
exists, err := e.TableExists(ctx, tableName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to check if table exists: %w", err)
|
||||
}
|
||||
|
||||
var table *infinity.Table
|
||||
if exists {
|
||||
// Table exists, open it and check if vector column needs to be added
|
||||
common.Info("Table already exists, checking for vector column", zap.String("tableName", tableName))
|
||||
table, err = db.GetTable(tableName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to open existing table %s: %w", tableName, err)
|
||||
}
|
||||
|
||||
// Check if vector column exists (for embedding model changes)
|
||||
colExists, err := e.columnExists(table, vectorColName)
|
||||
if err != nil {
|
||||
common.Warn("Failed to check column existence", zap.String("column", vectorColName), zap.Error(err))
|
||||
}
|
||||
|
||||
// Add new vector column if it doesn't exist (handles embedding model change)
|
||||
if !colExists {
|
||||
common.Info("Adding new vector column for embedding model change", zap.String("column", vectorColName), zap.Int("size", vecSize))
|
||||
addColSchema := infinity.TableSchema{
|
||||
&infinity.ColumnDefinition{
|
||||
Name: vectorColName,
|
||||
DataType: fmt.Sprintf("vector,%d,float", vecSize),
|
||||
},
|
||||
}
|
||||
if _, err := table.AddColumns(addColSchema); err != nil {
|
||||
common.Error("Failed to add vector column "+vectorColName, err)
|
||||
return fmt.Errorf("Failed to add vector column %s: %w", vectorColName, err)
|
||||
}
|
||||
common.Info("Successfully added vector column", zap.String("column", vectorColName))
|
||||
}
|
||||
} else {
|
||||
// Table doesn't exist, create it with vector column in the initial schema
|
||||
common.Info(fmt.Sprintf("Creating table with vector column: %s with dimension %d", vectorColName, vecSize))
|
||||
|
||||
// Build column definitions (preserving JSON order)
|
||||
var columns infinity.TableSchema
|
||||
for _, fieldName := range schema.Keys {
|
||||
fieldInfo := schema.Fields[fieldName]
|
||||
col := infinity.ColumnDefinition{
|
||||
Name: fieldName,
|
||||
DataType: fieldInfo.Type,
|
||||
Default: fieldInfo.Default,
|
||||
// Comment: fieldInfo.Comment,
|
||||
}
|
||||
columns = append(columns, &col)
|
||||
}
|
||||
|
||||
// Add vector column
|
||||
columns = append(columns, &infinity.ColumnDefinition{
|
||||
Name: vectorColName,
|
||||
DataType: fmt.Sprintf("vector,%d,float", vecSize),
|
||||
})
|
||||
|
||||
// Add chunk_data column for table parser
|
||||
if parserID == "table" {
|
||||
columns = append(columns, &infinity.ColumnDefinition{
|
||||
Name: "chunk_data",
|
||||
DataType: "json",
|
||||
Default: "{}",
|
||||
})
|
||||
}
|
||||
|
||||
// Create table
|
||||
table, err = db.CreateTable(tableName, columns, infinity.ConflictTypeIgnore)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create table: %w", err)
|
||||
}
|
||||
common.Debug("Infinity created table", zap.String("tableName", tableName))
|
||||
}
|
||||
|
||||
// Create HNSW index on vector column with unique name based on vector size
|
||||
// Use unique index name to avoid conflict when embedding model changes
|
||||
vectorIndexName := fmt.Sprintf("q_%d_vec_idx", vecSize)
|
||||
_, err = table.CreateIndex(
|
||||
vectorIndexName,
|
||||
infinity.NewIndexInfo(vectorColName, infinity.IndexTypeHnsw, map[string]string{
|
||||
"M": "16",
|
||||
"ef_construction": "50",
|
||||
"metric": "cosine",
|
||||
"encode": "lvq",
|
||||
}),
|
||||
infinity.ConflictTypeIgnore,
|
||||
"",
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create HNSW index %s: %w", vectorIndexName, err)
|
||||
}
|
||||
common.Info("Created vector index", zap.String("indexName", vectorIndexName), zap.String("column", vectorColName))
|
||||
|
||||
// Create full-text indexes for varchar fields with analyzers
|
||||
for _, fieldName := range schema.Keys {
|
||||
fieldInfo := schema.Fields[fieldName]
|
||||
if fieldInfo.Type != "varchar" || fieldInfo.Analyzer == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
analyzers := []string{}
|
||||
switch a := fieldInfo.Analyzer.(type) {
|
||||
case string:
|
||||
analyzers = []string{a}
|
||||
case []interface{}:
|
||||
for _, v := range a {
|
||||
if s, ok := v.(string); ok {
|
||||
analyzers = append(analyzers, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, analyzer := range analyzers {
|
||||
indexNameFt := fmt.Sprintf("ft_%s_%s",
|
||||
regexp.MustCompile(`[^a-zA-Z0-9]`).ReplaceAllString(fieldName, "_"),
|
||||
regexp.MustCompile(`[^a-zA-Z0-9]`).ReplaceAllString(analyzer, "_"),
|
||||
)
|
||||
_, err = table.CreateIndex(
|
||||
indexNameFt,
|
||||
infinity.NewIndexInfo(fieldName, infinity.IndexTypeFullText, map[string]string{"ANALYZER": analyzer}),
|
||||
infinity.ConflictTypeIgnore,
|
||||
"",
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create fulltext index %s: %w", indexNameFt, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create secondary indexes for fields with index_type
|
||||
for _, fieldName := range schema.Keys {
|
||||
fieldInfo := schema.Fields[fieldName]
|
||||
if fieldInfo.IndexType == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
indexTypeStr := ""
|
||||
params := map[string]string{}
|
||||
|
||||
switch it := fieldInfo.IndexType.(type) {
|
||||
case string:
|
||||
indexTypeStr = it
|
||||
case map[string]interface{}:
|
||||
if t, ok := it["type"].(string); ok {
|
||||
indexTypeStr = t
|
||||
}
|
||||
if card, ok := it["cardinality"].(string); ok {
|
||||
params["cardinality"] = card
|
||||
}
|
||||
}
|
||||
|
||||
if indexTypeStr == "secondary" {
|
||||
indexNameSec := fmt.Sprintf("sec_%s", fieldName)
|
||||
_, err = table.CreateIndex(
|
||||
indexNameSec,
|
||||
infinity.NewIndexInfo(fieldName, infinity.IndexTypeSecondary, params),
|
||||
infinity.ConflictTypeIgnore,
|
||||
"",
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create secondary index %s: %w", indexNameSec, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_ = table // suppress unused variable warning
|
||||
return nil
|
||||
}
|
||||
|
||||
// InsertDataset inserts chunks into a dataset table
|
||||
// Table name format: {tableNamePrefix}_{knowledgebaseID}
|
||||
// Auto-create the table if it doesn't exist
|
||||
// Delete existing rows with matching IDs before insert
|
||||
func (e *infinityEngine) InsertDataset(ctx context.Context, chunks []map[string]interface{}, tableNamePrefix string, knowledgebaseID string) ([]string, error) {
|
||||
tableName := fmt.Sprintf("%s_%s", tableNamePrefix, knowledgebaseID)
|
||||
common.Info("InfinityConnection.InsertDataset called", zap.String("tableName", tableName), zap.Int("chunkCount", len(chunks)))
|
||||
|
||||
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to get database: %w", err)
|
||||
}
|
||||
|
||||
table, err := db.GetTable(tableName)
|
||||
if err != nil {
|
||||
// Table doesn't exist, try to create it
|
||||
errMsg := strings.ToLower(err.Error())
|
||||
if !strings.Contains(errMsg, "not found") && !strings.Contains(errMsg, "doesn't exist") {
|
||||
return nil, fmt.Errorf("Failed to get table %s: %w", tableName, err)
|
||||
}
|
||||
|
||||
// Infer vector size from chunks
|
||||
vectorSize := 0
|
||||
vectorPattern := regexp.MustCompile(`q_(\d+)_vec`)
|
||||
for _, chunk := range chunks {
|
||||
for key := range chunk {
|
||||
matches := vectorPattern.FindStringSubmatch(key)
|
||||
if len(matches) >= 2 {
|
||||
vectorSize, _ = strconv.Atoi(matches[1])
|
||||
break
|
||||
}
|
||||
}
|
||||
if vectorSize > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if vectorSize == 0 {
|
||||
return nil, fmt.Errorf("cannot infer vector size from chunks")
|
||||
}
|
||||
|
||||
// Determine parser_id from chunk structure
|
||||
parserID := ""
|
||||
if chunkData, ok := chunks[0]["chunk_data"].(map[string]interface{}); ok && chunkData != nil {
|
||||
parserID = "table"
|
||||
}
|
||||
|
||||
// Create table
|
||||
if err := e.CreateDataset(ctx, tableNamePrefix, knowledgebaseID, vectorSize, parserID); err != nil {
|
||||
return nil, fmt.Errorf("Failed to create table: %w", err)
|
||||
}
|
||||
|
||||
table, err = db.GetTable(tableName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to get table after creation: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Get embedding columns and their sizes
|
||||
var embeddingCols [][2]interface{}
|
||||
colsResp, err := table.ShowColumns()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to get columns: %w", err)
|
||||
}
|
||||
result, ok := colsResp.(*infinity.QueryResult)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected response type: %T", colsResp)
|
||||
}
|
||||
|
||||
// ShowColumns returns a result set where Data contains arrays of column values
|
||||
re := regexp.MustCompile(`Embedding\([a-z]+,(\d+)\)`)
|
||||
if nameArr, ok := result.Data["name"]; ok {
|
||||
if typeArr, ok := result.Data["type"]; ok {
|
||||
for i := 0; i < len(nameArr); i++ {
|
||||
colName, _ := nameArr[i].(string)
|
||||
colType, _ := typeArr[i].(string)
|
||||
matches := re.FindStringSubmatch(colType)
|
||||
if len(matches) >= 2 {
|
||||
size, _ := strconv.Atoi(matches[1])
|
||||
embeddingCols = append(embeddingCols, [2]interface{}{colName, size})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Transform chunks using helper function
|
||||
insertChunks := make([]map[string]interface{}, len(chunks))
|
||||
for i, chunk := range chunks {
|
||||
insertChunks[i] = TransformChunkFields(chunk, embeddingCols)
|
||||
}
|
||||
|
||||
// Delete existing rows with matching IDs
|
||||
if len(insertChunks) > 0 {
|
||||
idList := make([]string, len(insertChunks))
|
||||
for i, chunk := range insertChunks {
|
||||
idList[i] = fmt.Sprintf("'%v'", chunk["id"])
|
||||
}
|
||||
filter := fmt.Sprintf("id IN (%s)", strings.Join(idList, ", "))
|
||||
common.Debug(fmt.Sprintf("Deleting existing rows with filter: %s", filter))
|
||||
delResp, delErr := table.Delete(filter)
|
||||
if delErr != nil {
|
||||
common.Warn(fmt.Sprintf("Failed to delete existing rows: %v", delErr))
|
||||
} else {
|
||||
common.Info(fmt.Sprintf("Deleted %d existing rows", delResp.DeletedRows))
|
||||
}
|
||||
}
|
||||
|
||||
// Insert chunks to dataset
|
||||
_, err = table.Insert(insertChunks)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to insert chunks to dataset: %w", err)
|
||||
}
|
||||
|
||||
common.Info("InfinityConnection.InsertDataset result", zap.String("tableName", tableName), zap.Int("count", len(insertChunks)))
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
// UpdateDataset updates chunks in a dataset table
|
||||
// Table name format: {tableNamePrefix}_{knowledgebaseID}
|
||||
func (e *infinityEngine) UpdateDataset(ctx context.Context, condition map[string]interface{}, newValue map[string]interface{}, tableNamePrefix string, knowledgebaseID string) error {
|
||||
tableName := fmt.Sprintf("%s_%s", tableNamePrefix, knowledgebaseID)
|
||||
common.Info("InfinityConnection.UpdateDataset called", zap.String("tableName", tableName), zap.Any("condition", condition))
|
||||
|
||||
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get database: %w", err)
|
||||
}
|
||||
|
||||
table, err := db.GetTable(tableName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get table %s: %w", tableName, err)
|
||||
}
|
||||
|
||||
// Get table columns
|
||||
clmns := make(map[string]struct {
|
||||
Type string
|
||||
Default interface{}
|
||||
})
|
||||
colsResp, err := table.ShowColumns()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get columns: %w", err)
|
||||
}
|
||||
result, ok := colsResp.(*infinity.QueryResult)
|
||||
if ok {
|
||||
if nameArr, ok := result.Data["name"]; ok {
|
||||
if typeArr, ok := result.Data["type"]; ok {
|
||||
if defArr, ok := result.Data["default"]; ok {
|
||||
for i := 0; i < len(nameArr); i++ {
|
||||
colName, _ := nameArr[i].(string)
|
||||
colType, _ := typeArr[i].(string)
|
||||
var colDefault interface{}
|
||||
if i < len(defArr) {
|
||||
colDefault = defArr[i]
|
||||
}
|
||||
clmns[colName] = struct {
|
||||
Type string
|
||||
Default interface{}
|
||||
}{colType, colDefault}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Build filter string from condition
|
||||
filter := buildFilterFromCondition(condition, clmns)
|
||||
|
||||
// Process remove operation first
|
||||
removeValue := make(map[string]interface{})
|
||||
if removeData, ok := newValue["remove"].(map[string]interface{}); ok {
|
||||
removeValue = removeData
|
||||
}
|
||||
delete(newValue, "remove")
|
||||
|
||||
// Transform new_value fields using helper function (no embeddings needed for update)
|
||||
transformed := TransformChunkFields(newValue, nil)
|
||||
for k, v := range transformed {
|
||||
newValue[k] = v
|
||||
}
|
||||
|
||||
// Remove original fields that were transformed (they're now in transformed with new names/types)
|
||||
// Also remove intermediate token fields that shouldn't be stored in Infinity
|
||||
// This must match Python's delete list in infinity_conn.py
|
||||
for _, key := range []string{"docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks",
|
||||
"content_with_weight", "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks",
|
||||
"question_kwd", "question_tks"} {
|
||||
delete(newValue, key)
|
||||
}
|
||||
|
||||
// Handle remove operations if any
|
||||
if len(removeValue) > 0 {
|
||||
colToRemove := make([]string, 0, len(removeValue))
|
||||
for k := range removeValue {
|
||||
colToRemove = append(colToRemove, k)
|
||||
}
|
||||
colToRemove = append(colToRemove, "id")
|
||||
|
||||
// Query rows to be updated
|
||||
queryResult, err := table.Output(colToRemove).Filter(filter).ToResult()
|
||||
if err != nil {
|
||||
common.Warn(fmt.Sprintf("Failed to query rows for remove operation: %v", err))
|
||||
} else {
|
||||
qr, ok := queryResult.(*infinity.QueryResult)
|
||||
if ok && len(qr.Data) > 0 {
|
||||
// Get the id column and columns to remove
|
||||
idCol := qr.Data["id"]
|
||||
removeOpt := make(map[string]map[string][]string) // column -> value -> [ids]
|
||||
|
||||
for colName, colData := range qr.Data {
|
||||
if colName == "id" {
|
||||
continue
|
||||
}
|
||||
removeVal := removeValue[colName]
|
||||
for i, id := range idCol {
|
||||
if i < len(colData) {
|
||||
existingVal := colData[i]
|
||||
if removeStr, ok := removeVal.(string); ok {
|
||||
// Split existing value by ### and remove the target value
|
||||
if existingStr, ok := existingVal.(string); ok {
|
||||
parts := strings.Split(existingStr, "###")
|
||||
var newParts []string
|
||||
for _, p := range parts {
|
||||
if p != removeStr {
|
||||
newParts = append(newParts, p)
|
||||
}
|
||||
}
|
||||
if len(newParts) != len(parts) {
|
||||
idStr := fmt.Sprintf("%v", id)
|
||||
if removeOpt[colName] == nil {
|
||||
removeOpt[colName] = make(map[string][]string)
|
||||
}
|
||||
removeOpt[colName][strings.Join(newParts, "###")] = append(removeOpt[colName][strings.Join(newParts, "###")], idStr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Execute remove updates
|
||||
for colName, valueToIDs := range removeOpt {
|
||||
for newVal, ids := range valueToIDs {
|
||||
idFilter := filter + " AND id IN (" + strings.Join(ids, ", ") + ")"
|
||||
common.Info(fmt.Sprintf("INFINITY remove update: table=%s, idFilter=%s, column=%s, newValue=%v", tableName, idFilter, colName, newVal))
|
||||
_, err := table.Update(idFilter, map[string]interface{}{colName: newVal})
|
||||
if err != nil {
|
||||
common.Warn(fmt.Sprintf("Failed to remove value from column %s: %v", colName, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Execute the main update
|
||||
common.Info(fmt.Sprintf("INFINITY update: table=%s, filter=%s, newValue=%v", tableName, filter, newValue))
|
||||
_, err = table.Update(filter, newValue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to update chunks: %w", err)
|
||||
}
|
||||
|
||||
common.Info("InfinityConnection.UpdateDataset completes", zap.String("tableName", tableName))
|
||||
return nil
|
||||
}
|
||||
|
||||
// TransformChunkFields transforms chunk field name for insert/update
|
||||
// It handles field name conversions and value transformations:
|
||||
// - docnm_kwd -> docnm
|
||||
// - title_kwd/title_sm_tks -> docnm (if docnm_kwd not set)
|
||||
// - important_kwd -> important_keywords (+ important_kwd_empty_count)
|
||||
// - content_with_weight/content_ltks/content_sm_ltks -> content
|
||||
// - authors_tks/authors_sm_tks -> authors
|
||||
// - question_kwd -> questions (joined with \n), question_tks -> questions (if question_kwd not set)
|
||||
// - kb_id: list -> str (first element)
|
||||
// - position_int: list -> hex_joined string
|
||||
// - page_num_int, top_int: list -> hex string
|
||||
// - *_feas fields -> JSON string
|
||||
// - keyword fields with list values -> ### joined string
|
||||
// - chunk_data: dict -> JSON string
|
||||
// - Missing embeddings filled with zeros if embeddingCols provided
|
||||
func TransformChunkFields(chunk map[string]interface{}, embeddingCols [][2]interface{}) map[string]interface{} {
|
||||
d := make(map[string]interface{})
|
||||
|
||||
for k, v := range chunk {
|
||||
switch k {
|
||||
case "docnm_kwd":
|
||||
d["docnm"] = v
|
||||
case "title_kwd":
|
||||
if _, exists := chunk["docnm_kwd"]; !exists {
|
||||
d["docnm"] = utility.ConvertToString(v)
|
||||
}
|
||||
case "title_sm_tks":
|
||||
if _, exists := chunk["docnm_kwd"]; !exists {
|
||||
d["docnm"] = utility.ConvertToString(v)
|
||||
}
|
||||
case "important_kwd":
|
||||
if list, ok := v.([]interface{}); ok {
|
||||
emptyCount := 0
|
||||
tokens := make([]string, 0)
|
||||
for _, item := range list {
|
||||
if str, ok := item.(string); ok {
|
||||
if str == "" {
|
||||
emptyCount++
|
||||
} else {
|
||||
tokens = append(tokens, str)
|
||||
}
|
||||
}
|
||||
}
|
||||
d["important_keywords"] = strings.Join(tokens, ",")
|
||||
d["important_kwd_empty_count"] = emptyCount
|
||||
} else {
|
||||
d["important_keywords"] = utility.ConvertToString(v)
|
||||
}
|
||||
case "important_tks":
|
||||
if _, exists := chunk["important_kwd"]; !exists {
|
||||
d["important_keywords"] = v
|
||||
}
|
||||
case "content_with_weight":
|
||||
d["content"] = v
|
||||
case "content_ltks":
|
||||
if _, exists := chunk["content_with_weight"]; !exists {
|
||||
d["content"] = v
|
||||
}
|
||||
case "content_sm_ltks":
|
||||
if _, exists := chunk["content_with_weight"]; !exists {
|
||||
d["content"] = v
|
||||
}
|
||||
case "authors_tks":
|
||||
d["authors"] = v
|
||||
case "authors_sm_tks":
|
||||
if _, exists := chunk["authors_tks"]; !exists {
|
||||
d["authors"] = v
|
||||
}
|
||||
case "question_kwd":
|
||||
d["questions"] = strings.Join(utility.ConvertToStringSlice(v), "\n")
|
||||
case "tag_kwd":
|
||||
d["tag_kwd"] = strings.Join(utility.ConvertToStringSlice(v), "###")
|
||||
case "question_tks":
|
||||
if _, exists := chunk["question_kwd"]; !exists {
|
||||
d["questions"] = utility.ConvertToString(v)
|
||||
}
|
||||
case "kb_id":
|
||||
if list, ok := v.([]interface{}); ok && len(list) > 0 {
|
||||
d["kb_id"] = list[0]
|
||||
} else {
|
||||
d["kb_id"] = v
|
||||
}
|
||||
case "position_int":
|
||||
if list, ok := v.([]interface{}); ok {
|
||||
d["position_int"] = utility.ConvertPositionIntArrayToHex(list)
|
||||
} else {
|
||||
d["position_int"] = v
|
||||
}
|
||||
case "page_num_int", "top_int":
|
||||
if list, ok := v.([]interface{}); ok {
|
||||
d[k] = utility.ConvertIntArrayToHex(list)
|
||||
} else {
|
||||
d[k] = v
|
||||
}
|
||||
case "chunk_data":
|
||||
d["chunk_data"] = utility.ConvertMapToJSONString(v)
|
||||
default:
|
||||
// Check for *_feas fields
|
||||
if strings.HasSuffix(k, "_feas") {
|
||||
jsonBytes, _ := json.Marshal(v)
|
||||
d[k] = string(jsonBytes)
|
||||
} else if fieldKeyword(k) {
|
||||
// keyword fields with list values -> ### joined
|
||||
if list, ok := v.([]interface{}); ok {
|
||||
d[k] = strings.Join(utility.ConvertToStringSlice(list), "###")
|
||||
} else {
|
||||
d[k] = v
|
||||
}
|
||||
} else {
|
||||
d[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove intermediate token fields
|
||||
for _, key := range []string{"docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks",
|
||||
"content_with_weight", "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks",
|
||||
"question_kwd", "question_tks"} {
|
||||
delete(d, key)
|
||||
}
|
||||
|
||||
// Fill missing embedding columns with zeros if embedding info provided
|
||||
for _, ec := range embeddingCols {
|
||||
name, ok1 := ec[0].(string)
|
||||
size, ok2 := ec[1].(int)
|
||||
if !ok1 || !ok2 {
|
||||
continue
|
||||
}
|
||||
if _, exists := d[name]; !exists {
|
||||
zeros := make([]float64, size)
|
||||
for i := range zeros {
|
||||
zeros[i] = 0
|
||||
}
|
||||
d[name] = zeros
|
||||
}
|
||||
}
|
||||
|
||||
return d
|
||||
}
|
||||
@@ -1,303 +0,0 @@
|
||||
//
|
||||
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
package infinity
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"ragflow/internal/common"
|
||||
"strings"
|
||||
|
||||
"ragflow/internal/utility"
|
||||
|
||||
infinity "github.com/infiniflow/infinity-go-sdk"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// GetChunk gets a chunk by ID
|
||||
func (e *infinityEngine) GetChunk(ctx context.Context, tableName, chunkID string, kbIDs []string) (interface{}, error) {
|
||||
if e.client == nil || e.client.conn == nil {
|
||||
return nil, fmt.Errorf("Infinity client not initialized")
|
||||
}
|
||||
|
||||
// Build list of table names to search
|
||||
var tableNames []string
|
||||
if strings.HasPrefix(tableName, "ragflow_doc_meta_") {
|
||||
tableNames = []string{tableName}
|
||||
} else {
|
||||
// Search in tables like <tableName>_<kb_id> for each kbID
|
||||
if len(kbIDs) > 0 {
|
||||
for _, kbID := range kbIDs {
|
||||
tableNames = append(tableNames, fmt.Sprintf("%s_%s", tableName, kbID))
|
||||
}
|
||||
}
|
||||
// Also try the base tableName
|
||||
tableNames = append(tableNames, tableName)
|
||||
}
|
||||
|
||||
// Try each table and collect results from all tables
|
||||
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get database: %w", err)
|
||||
}
|
||||
|
||||
// Collect chunks from all tables (same as Python's concat_dataframes)
|
||||
allChunks := make(map[string]map[string]interface{})
|
||||
|
||||
for _, tblName := range tableNames {
|
||||
table, err := db.GetTable(tblName)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Query with filter for the specific chunk ID
|
||||
filter := fmt.Sprintf("id = '%s'", chunkID)
|
||||
result, err := table.Output([]string{"*"}).Filter(filter).ToResult()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
qr, ok := result.(*infinity.QueryResult)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(qr.Data) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Convert to chunk format
|
||||
chunks := make([]map[string]interface{}, 0)
|
||||
for colName, colData := range qr.Data {
|
||||
for i, val := range colData {
|
||||
for len(chunks) <= i {
|
||||
chunks = append(chunks, make(map[string]interface{}))
|
||||
}
|
||||
chunks[i][colName] = val
|
||||
}
|
||||
}
|
||||
|
||||
// Merge chunks into allChunks (by id), keeping first non-empty value
|
||||
for _, chunk := range chunks {
|
||||
if idVal, ok := chunk["id"].(string); ok {
|
||||
if existing, exists := allChunks[idVal]; exists {
|
||||
// Merge: keep first non-empty value for each field
|
||||
for k, v := range chunk {
|
||||
if _, has := existing[k]; !has || utility.IsEmpty(v) {
|
||||
existing[k] = v
|
||||
}
|
||||
}
|
||||
} else {
|
||||
allChunks[idVal] = chunk
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the chunk by chunkID
|
||||
chunk, found := allChunks[chunkID]
|
||||
if !found {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
common.Debug("infinity get chunk", zap.String("chunkID", chunkID), zap.Any("tables", tableNames))
|
||||
|
||||
// Apply field mappings (same as in GetFields)
|
||||
// docnm -> docnm_kwd, title_tks, title_sm_tks
|
||||
if val, ok := chunk["docnm"].(string); ok {
|
||||
chunk["docnm_kwd"] = val
|
||||
chunk["title_tks"] = val
|
||||
chunk["title_sm_tks"] = val
|
||||
}
|
||||
|
||||
// content -> content_with_weight, content_ltks, content_sm_ltks
|
||||
if val, ok := chunk["content"].(string); ok {
|
||||
chunk["content_with_weight"] = val
|
||||
chunk["content_ltks"] = val
|
||||
chunk["content_sm_ltks"] = val
|
||||
}
|
||||
|
||||
// important_keywords -> important_kwd (split by comma), important_tks
|
||||
if val, ok := chunk["important_keywords"].(string); ok {
|
||||
if val == "" {
|
||||
chunk["important_kwd"] = []interface{}{}
|
||||
} else {
|
||||
parts := strings.Split(val, ",")
|
||||
chunk["important_kwd"] = parts
|
||||
}
|
||||
chunk["important_tks"] = val
|
||||
} else {
|
||||
chunk["important_kwd"] = []interface{}{}
|
||||
chunk["important_tks"] = []interface{}{}
|
||||
}
|
||||
|
||||
// questions -> question_kwd (split by newline), question_tks
|
||||
if val, ok := chunk["questions"].(string); ok {
|
||||
if val == "" {
|
||||
chunk["question_kwd"] = []interface{}{}
|
||||
} else {
|
||||
parts := strings.Split(val, "\n")
|
||||
chunk["question_kwd"] = parts
|
||||
}
|
||||
chunk["question_tks"] = val
|
||||
} else {
|
||||
chunk["question_kwd"] = []interface{}{}
|
||||
chunk["question_tks"] = []interface{}{}
|
||||
}
|
||||
|
||||
if posVal, ok := chunk["position_int"].(string); ok {
|
||||
chunk["position_int"] = utility.ConvertHexToPositionIntArray(posVal)
|
||||
} else {
|
||||
chunk["position_int"] = []interface{}{}
|
||||
}
|
||||
|
||||
return chunk, nil
|
||||
}
|
||||
|
||||
// GetFields applies field mappings to chunks and returns a dict keyed by chunk ID.
|
||||
// Equivalent to Python's get_fields() in infinity_conn.py.
|
||||
// When fields is nil/empty, returns all fields from chunks.
|
||||
func GetFields(chunks []map[string]interface{}, fields []string) map[string]map[string]interface{} {
|
||||
result := make(map[string]map[string]interface{})
|
||||
if len(chunks) == 0 {
|
||||
return result
|
||||
}
|
||||
|
||||
// If fields is provided, create a set for lookup
|
||||
fieldSet := make(map[string]bool)
|
||||
for _, f := range fields {
|
||||
fieldSet[f] = true
|
||||
}
|
||||
|
||||
for _, chunk := range chunks {
|
||||
// Apply field mappings
|
||||
// docnm -> docnm_kwd, title_tks, title_sm_tks
|
||||
if val, ok := chunk["docnm"].(string); ok {
|
||||
chunk["docnm_kwd"] = val
|
||||
chunk["title_tks"] = val
|
||||
chunk["title_sm_tks"] = val
|
||||
}
|
||||
|
||||
// important_keywords -> important_kwd (split by comma), important_tks
|
||||
if val, ok := chunk["important_keywords"].(string); ok {
|
||||
if val == "" {
|
||||
chunk["important_kwd"] = []interface{}{}
|
||||
} else {
|
||||
parts := strings.Split(val, ",")
|
||||
chunk["important_kwd"] = parts
|
||||
}
|
||||
chunk["important_tks"] = val
|
||||
} else {
|
||||
chunk["important_kwd"] = []interface{}{}
|
||||
chunk["important_tks"] = []interface{}{}
|
||||
}
|
||||
|
||||
// questions -> question_kwd (split by newline), question_tks
|
||||
if val, ok := chunk["questions"].(string); ok {
|
||||
if val == "" {
|
||||
chunk["question_kwd"] = []interface{}{}
|
||||
} else {
|
||||
parts := strings.Split(val, "\n")
|
||||
chunk["question_kwd"] = parts
|
||||
}
|
||||
chunk["question_tks"] = val
|
||||
} else {
|
||||
chunk["question_kwd"] = []interface{}{}
|
||||
chunk["question_tks"] = []interface{}{}
|
||||
}
|
||||
|
||||
// content -> content_with_weight, content_ltks, content_sm_ltks
|
||||
if val, ok := chunk["content"].(string); ok {
|
||||
chunk["content_with_weight"] = val
|
||||
chunk["content_ltks"] = val
|
||||
chunk["content_sm_ltks"] = val
|
||||
}
|
||||
|
||||
// authors -> authors_tks, authors_sm_tks
|
||||
if val, ok := chunk["authors"].(string); ok {
|
||||
chunk["authors_tks"] = val
|
||||
chunk["authors_sm_tks"] = val
|
||||
}
|
||||
|
||||
// position_int: convert from hex string to array format (grouped by 5)
|
||||
if val, ok := chunk["position_int"].(string); ok {
|
||||
chunk["position_int"] = utility.ConvertHexToPositionIntArray(val)
|
||||
}
|
||||
|
||||
// Convert page_num_int and top_int from hex string to array
|
||||
for _, colName := range []string{"page_num_int", "top_int"} {
|
||||
if val, ok := chunk[colName].(string); ok && val != "" {
|
||||
chunk[colName] = utility.ConvertHexToIntArray(val)
|
||||
}
|
||||
}
|
||||
|
||||
// Post-process: convert nil/empty values to empty slices for array-like fields
|
||||
// and split _kwd fields by "###" (except knowledge_graph_kwd, docnm_kwd, important_kwd, question_kwd)
|
||||
kwdNoSplit := map[string]bool{
|
||||
"knowledge_graph_kwd": true, "docnm_kwd": true,
|
||||
"important_kwd": true, "question_kwd": true,
|
||||
}
|
||||
arrayFields := []string{
|
||||
"doc_type_kwd", "important_kwd", "important_tks", "question_tks",
|
||||
"question_kwd", "authors_tks", "authors_sm_tks", "title_tks",
|
||||
"title_sm_tks", "content_ltks", "content_sm_ltks", "tag_kwd",
|
||||
}
|
||||
for _, colName := range arrayFields {
|
||||
val, ok := chunk[colName]
|
||||
if !ok || val == nil || val == "" {
|
||||
chunk[colName] = []interface{}{}
|
||||
} else if !kwdNoSplit[colName] {
|
||||
// Split by "###" for _kwd fields
|
||||
if strVal, ok := val.(string); ok && strings.Contains(strVal, "###") {
|
||||
parts := strings.Split(strVal, "###")
|
||||
var filtered []interface{}
|
||||
for _, p := range parts {
|
||||
if p != "" {
|
||||
filtered = append(filtered, p)
|
||||
}
|
||||
}
|
||||
chunk[colName] = filtered
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle row_id mapping - Infinity returns "ROW_ID" but we use "row_id()"
|
||||
if val, ok := chunk["ROW_ID"]; ok {
|
||||
chunk["row_id()"] = val
|
||||
delete(chunk, "ROW_ID")
|
||||
}
|
||||
|
||||
// Build result map keyed by id
|
||||
if id, ok := chunk["id"].(string); ok {
|
||||
fieldMap := make(map[string]interface{})
|
||||
for field, value := range chunk {
|
||||
if len(fieldSet) == 0 || fieldSet[field] {
|
||||
fieldMap[field] = value
|
||||
}
|
||||
}
|
||||
result[id] = fieldMap
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// GetFields is a method wrapper for infinityEngine to satisfy DocEngine interface
|
||||
func (e *infinityEngine) GetFields(chunks []map[string]interface{}, fields []string) map[string]map[string]interface{} {
|
||||
return GetFields(chunks, fields)
|
||||
}
|
||||
@@ -22,18 +22,20 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"ragflow/internal/common"
|
||||
"strings"
|
||||
|
||||
"ragflow/internal/utility"
|
||||
|
||||
infinity "github.com/infiniflow/infinity-go-sdk"
|
||||
"ragflow/internal/common"
|
||||
"ragflow/internal/utility"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// CreateMetadata creates the document metadata table/index
|
||||
func (e *infinityEngine) CreateMetadata(ctx context.Context, indexName string) error {
|
||||
// CreateMetadataStore creates a metadata table in Infinity
|
||||
// tenantID is the tenant identifier used to build the table name
|
||||
func (e *infinityEngine) CreateMetadataStore(ctx context.Context, tenantID string) error {
|
||||
tableName := buildMetadataTableName(tenantID)
|
||||
|
||||
// Get database
|
||||
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
||||
if err != nil {
|
||||
@@ -41,12 +43,12 @@ func (e *infinityEngine) CreateMetadata(ctx context.Context, indexName string) e
|
||||
}
|
||||
|
||||
// Check if table already exists
|
||||
exists, err := e.TableExists(ctx, indexName)
|
||||
exists, err := e.tableExists(ctx, tableName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to check if table exists: %w", err)
|
||||
}
|
||||
if exists {
|
||||
return fmt.Errorf("metadata table '%s' already exists", indexName)
|
||||
return fmt.Errorf("metadata table '%s' already exists", tableName)
|
||||
}
|
||||
|
||||
// Use configured doc_meta mapping file
|
||||
@@ -69,27 +71,26 @@ func (e *infinityEngine) CreateMetadata(ctx context.Context, indexName string) e
|
||||
Name: fieldName,
|
||||
DataType: fieldInfo.Type,
|
||||
Default: fieldInfo.Default,
|
||||
// Comment: fieldInfo.Comment,
|
||||
}
|
||||
columns = append(columns, &col)
|
||||
}
|
||||
|
||||
// Create table
|
||||
_, err = db.CreateTable(indexName, columns, infinity.ConflictTypeIgnore)
|
||||
_, err = db.CreateTable(tableName, columns, infinity.ConflictTypeIgnore)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create doc meta table: %w", err)
|
||||
}
|
||||
common.Debug("Infinity created doc meta table", zap.String("tableName", indexName))
|
||||
common.Debug("Infinity created doc meta table", zap.String("tableName", tableName))
|
||||
|
||||
// Get table for creating indexes
|
||||
table, err := db.GetTable(indexName)
|
||||
table, err := db.GetTable(tableName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get table: %w", err)
|
||||
}
|
||||
|
||||
// Create secondary index on id
|
||||
_, err = table.CreateIndex(
|
||||
fmt.Sprintf("idx_%s_id", indexName),
|
||||
fmt.Sprintf("idx_%s_id", tableName),
|
||||
infinity.NewIndexInfo("id", infinity.IndexTypeSecondary, nil),
|
||||
infinity.ConflictTypeIgnore,
|
||||
"",
|
||||
@@ -100,7 +101,7 @@ func (e *infinityEngine) CreateMetadata(ctx context.Context, indexName string) e
|
||||
|
||||
// Create secondary index on kb_id
|
||||
_, err = table.CreateIndex(
|
||||
fmt.Sprintf("idx_%s_kb_id", indexName),
|
||||
fmt.Sprintf("idx_%s_kb_id", tableName),
|
||||
infinity.NewIndexInfo("kb_id", infinity.IndexTypeSecondary, nil),
|
||||
infinity.ConflictTypeIgnore,
|
||||
"",
|
||||
@@ -113,11 +114,10 @@ func (e *infinityEngine) CreateMetadata(ctx context.Context, indexName string) e
|
||||
}
|
||||
|
||||
// InsertMetadata inserts document metadata into tenant's metadata table
|
||||
// Table name format: ragflow_doc_meta_{tenant_id}
|
||||
// Auto-create the table if it doesn't exist
|
||||
// Replace existing metadata with same id and kb_id
|
||||
func (e *infinityEngine) InsertMetadata(ctx context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error) {
|
||||
tableName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID)
|
||||
tableName := buildMetadataTableName(tenantID)
|
||||
common.Info("InfinityConnection.InsertMetadata called", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata)))
|
||||
|
||||
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
||||
@@ -134,7 +134,7 @@ func (e *infinityEngine) InsertMetadata(ctx context.Context, metadata []map[stri
|
||||
}
|
||||
|
||||
// Create metadata table
|
||||
if createErr := e.CreateMetadata(ctx, tableName); createErr != nil {
|
||||
if createErr := e.CreateMetadataStore(ctx, tenantID); createErr != nil {
|
||||
return nil, fmt.Errorf("Failed to create metadata table: %w", createErr)
|
||||
}
|
||||
|
||||
@@ -188,12 +188,11 @@ func (e *infinityEngine) InsertMetadata(ctx context.Context, metadata []map[stri
|
||||
}
|
||||
|
||||
// UpdateMetadata updates or inserts document metadata in tenant's metadata table.
|
||||
// If a row with the given docID and kbID exists, it merges the new metadata with existing.
|
||||
// If a row with the given docID and datasetID exists, it merges the new metadata with existing.
|
||||
// If no row exists, it inserts a new row.
|
||||
// Table name format: ragflow_doc_meta_{tenant_id}
|
||||
func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, kbID string, metaFields map[string]interface{}, tenantID string) error {
|
||||
tableName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID)
|
||||
common.Info("InfinityConnection.UpdateMetadata called", zap.String("tableName", tableName), zap.String("docID", docID), zap.String("kbID", kbID))
|
||||
func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, datasetID string, metaFields map[string]interface{}, tenantID string) error {
|
||||
tableName := buildMetadataTableName(tenantID)
|
||||
common.Info("InfinityConnection.UpdateMetadata called", zap.String("tableName", tableName), zap.String("docID", docID), zap.String("datasetID", datasetID))
|
||||
|
||||
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
||||
if err != nil {
|
||||
@@ -205,10 +204,10 @@ func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, kbID
|
||||
return fmt.Errorf("failed to get metadata table %s: %w", tableName, err)
|
||||
}
|
||||
|
||||
// Build filter to find existing row by docID and kbID
|
||||
// Build filter to find existing row by docID and datasetID
|
||||
escapedDocID := strings.ReplaceAll(docID, "'", "''")
|
||||
escapedKbID := strings.ReplaceAll(kbID, "'", "''")
|
||||
filter := fmt.Sprintf("id = '%s' AND kb_id = '%s'", escapedDocID, escapedKbID)
|
||||
escapedDatasetID := strings.ReplaceAll(datasetID, "'", "''")
|
||||
filter := fmt.Sprintf("id = '%s' AND kb_id = '%s'", escapedDocID, escapedDatasetID)
|
||||
|
||||
// Query existing metadata using the chainable API
|
||||
queryTable := table.Output([]string{"id", "kb_id", "meta_fields"}).Filter(filter).Limit(1).Offset(0)
|
||||
@@ -271,7 +270,7 @@ func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, kbID
|
||||
// Row doesn't exist: insert new row
|
||||
insertFields := map[string]interface{}{
|
||||
"id": docID,
|
||||
"kb_id": kbID,
|
||||
"kb_id": datasetID,
|
||||
"meta_fields": utility.ConvertMapToJSONString(metaFields),
|
||||
}
|
||||
common.Info(fmt.Sprintf("UpdateMetadata: inserting new row, table=%s, newValue=%v", tableName, insertFields))
|
||||
@@ -284,3 +283,72 @@ func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, kbID
|
||||
common.Info("InfinityConnection.UpdateMetadata completes", zap.String("tableName", tableName), zap.String("docID", docID))
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteMetadata deletes metadata from tenant's metadata table by condition
|
||||
func (e *infinityEngine) DeleteMetadata(ctx context.Context, condition map[string]interface{}, tenantID string) (int64, error) {
|
||||
tableName := buildMetadataTableName(tenantID)
|
||||
|
||||
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get database: %w", err)
|
||||
}
|
||||
|
||||
table, err := db.GetTable(tableName)
|
||||
if err != nil {
|
||||
common.Warn(fmt.Sprintf("Metadata table %s does not exist, skipping delete", tableName))
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Get table columns for building filter
|
||||
clmns := make(map[string]struct {
|
||||
Type string
|
||||
Default interface{}
|
||||
})
|
||||
colsResp, err := table.ShowColumns()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get columns: %w", err)
|
||||
}
|
||||
result, ok := colsResp.(*infinity.QueryResult)
|
||||
if ok {
|
||||
if nameArr, ok := result.Data["name"]; ok {
|
||||
if typeArr, ok := result.Data["type"]; ok {
|
||||
if defArr, ok := result.Data["default"]; ok {
|
||||
for i := 0; i < len(nameArr); i++ {
|
||||
colName, _ := nameArr[i].(string)
|
||||
colType, _ := typeArr[i].(string)
|
||||
var colDefault interface{}
|
||||
if i < len(defArr) {
|
||||
colDefault = defArr[i]
|
||||
}
|
||||
clmns[colName] = struct {
|
||||
Type string
|
||||
Default interface{}
|
||||
}{colType, colDefault}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Build filter from condition
|
||||
filter := buildFilterFromCondition(condition, clmns)
|
||||
|
||||
delResp, err := table.Delete(filter)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to delete metadata: %w", err)
|
||||
}
|
||||
|
||||
return delResp.DeletedRows, nil
|
||||
}
|
||||
|
||||
// DropMetadataStore drops a metadata table from Infinity
|
||||
func (e *infinityEngine) DropMetadataStore(ctx context.Context, tenantID string) error {
|
||||
tableName := buildMetadataTableName(tenantID)
|
||||
return e.dropTable(ctx, tableName)
|
||||
}
|
||||
|
||||
// MetadataStoreExists checks if a metadata table exists in Infinity
|
||||
func (e *infinityEngine) MetadataStoreExists(ctx context.Context, tenantID string) (bool, error) {
|
||||
tableName := buildMetadataTableName(tenantID)
|
||||
return e.tableExists(ctx, tableName)
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -236,7 +236,7 @@ func (h *DatasetsHandler) GetKnowledgeGraph(c *gin.Context) {
|
||||
}
|
||||
|
||||
indexName := fmt.Sprintf("ragflow_%s", tenantID)
|
||||
exists, err := docEngine.TableExists(c.Request.Context(), indexName)
|
||||
exists, err := docEngine.ChunkStoreExists(c.Request.Context(), indexName, datasetID)
|
||||
if err != nil {
|
||||
jsonError(c, common.CodeServerError, err.Error())
|
||||
return
|
||||
@@ -335,7 +335,7 @@ func (h *DatasetsHandler) DeleteKnowledgeGraph(c *gin.Context) {
|
||||
}
|
||||
|
||||
indexName := fmt.Sprintf("ragflow_%s", tenantID)
|
||||
if _, err := docEngine.Delete(c.Request.Context(), map[string]interface{}{
|
||||
if _, err := docEngine.DeleteChunks(c.Request.Context(), map[string]interface{}{
|
||||
"knowledge_graph_kwd": []string{"graph", "subgraph", "entity", "relation", "community_report"},
|
||||
}, indexName, datasetID); err != nil {
|
||||
jsonError(c, common.CodeServerError, err.Error())
|
||||
|
||||
@@ -650,7 +650,7 @@ func (h *KnowledgebaseHandler) InsertDatasetFromFile(c *gin.Context) {
|
||||
|
||||
// Get the document engine and insert
|
||||
docEngine := engine.Get()
|
||||
result, err := docEngine.InsertDataset(c.Request.Context(), debugFormat.Chunks, debugFormat.TableNamePrefix, debugFormat.KnowledgebaseID)
|
||||
result, err := docEngine.InsertChunks(c.Request.Context(), debugFormat.Chunks, debugFormat.TableNamePrefix, debugFormat.KnowledgebaseID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{
|
||||
"code": 500,
|
||||
|
||||
@@ -890,7 +890,7 @@ func (s *ChunkService) UpdateChunk(req *UpdateChunkRequest, userID string) error
|
||||
"id": req.ChunkID,
|
||||
}
|
||||
|
||||
err = s.docEngine.UpdateDataset(ctx, condition, d, indexName, req.DatasetID)
|
||||
err = s.docEngine.UpdateChunks(ctx, condition, d, indexName, req.DatasetID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update chunk: %w", err)
|
||||
}
|
||||
@@ -970,7 +970,7 @@ func (s *ChunkService) RemoveChunks(req *RemoveChunksRequest, userID string) (in
|
||||
return 0, fmt.Errorf("either chunk_ids or delete_all must be provided")
|
||||
}
|
||||
|
||||
deletedCount, err := s.docEngine.Delete(ctx, condition, indexName, doc.KbID)
|
||||
deletedCount, err := s.docEngine.DeleteChunks(ctx, condition, indexName, doc.KbID)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to delete chunks: %w", err)
|
||||
}
|
||||
|
||||
@@ -662,7 +662,7 @@ func (s *FileService) deleteDocumentFromEngine(ctx context.Context, doc *entity.
|
||||
reqCtx, cancel := context.WithTimeout(ctx, 300*time.Second)
|
||||
defer cancel()
|
||||
condition := map[string]interface{}{"doc_id": doc.ID}
|
||||
if _, err := docEngine.Delete(reqCtx, condition, indexName, doc.KbID); err != nil {
|
||||
if _, err := docEngine.DeleteChunks(reqCtx, condition, indexName, doc.KbID); err != nil {
|
||||
return fmt.Errorf("delete document from engine: %w", err)
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -111,7 +111,7 @@ func (s *KnowledgebaseService) CreateDatasetInDocEngine(req *CreateDatasetTableR
|
||||
|
||||
// Call document engine to create table
|
||||
// Full table name will be built as "{tableName}_{kb_id}"
|
||||
err = s.docEngine.CreateDataset(context.Background(), tableName, req.KBID, vecSize, req.ParserID)
|
||||
err = s.docEngine.CreateChunkStore(context.Background(), tableName, req.KBID, vecSize, req.ParserID)
|
||||
if err != nil {
|
||||
return nil, common.CodeServerError, fmt.Errorf("failed to create dataset: %w", err)
|
||||
}
|
||||
@@ -131,11 +131,9 @@ func (s *KnowledgebaseService) DeleteDatasetInDocEngine(kbID string) (common.Err
|
||||
return common.CodeDataError, fmt.Errorf("knowledge base not found: %s", kbID)
|
||||
}
|
||||
|
||||
// Build table name: ragflow_<tenant_id>_<kb_id>
|
||||
tableName := fmt.Sprintf("ragflow_%s_%s", kb.TenantID, kbID)
|
||||
|
||||
// Call document engine to delete table
|
||||
err = s.docEngine.DropTable(context.Background(), tableName)
|
||||
err = s.docEngine.DropChunkStore(context.Background(), fmt.Sprintf("ragflow_%s", kb.TenantID), kbID)
|
||||
|
||||
if err != nil {
|
||||
return common.CodeServerError, fmt.Errorf("failed to delete table: %w", err)
|
||||
}
|
||||
@@ -285,7 +283,7 @@ func (s *KnowledgebaseService) Accessible(kbID, userID string) bool {
|
||||
|
||||
// RemoveTag removes a tag from documents in a dataset
|
||||
func (s *KnowledgebaseService) RemoveTag(condition map[string]interface{}, newValue map[string]interface{}, indexName, kbID string) error {
|
||||
return s.docEngine.UpdateDataset(context.Background(), condition, newValue, indexName, kbID)
|
||||
return s.docEngine.UpdateChunks(context.Background(), condition, newValue, indexName, kbID)
|
||||
}
|
||||
|
||||
// GetByID retrieves a knowledge base by ID
|
||||
|
||||
@@ -171,7 +171,7 @@ func (s *SkillIndexerService) IndexSkill(ctx context.Context, tenantID, spaceID
|
||||
|
||||
// For Infinity: ensure table exists with correct dimension BEFORE inserting
|
||||
if docEngine.GetType() == "infinity" {
|
||||
exists, _ := docEngine.TableExists(ctx, indexName)
|
||||
exists, _ := docEngine.ChunkStoreExists(ctx, indexName, "skill")
|
||||
if !exists {
|
||||
common.Info(fmt.Sprintf("Creating Infinity table with dimension %d", dimension))
|
||||
if err := s.createIndexWithDimension(ctx, tenantID, spaceID, docEngine, embdID, dimension); err != nil {
|
||||
@@ -252,7 +252,7 @@ func (s *SkillIndexerService) BatchIndexSkills(ctx context.Context, tenantID, sp
|
||||
if docEngine.GetType() == "infinity" {
|
||||
// For Infinity: must ensure table exists with correct dimension BEFORE inserting
|
||||
common.Info(fmt.Sprintf("Checking if index exists: %s", indexName))
|
||||
exists, err := docEngine.TableExists(ctx, indexName)
|
||||
exists, err := docEngine.ChunkStoreExists(ctx, indexName, "skill")
|
||||
if err != nil {
|
||||
common.Warn(fmt.Sprintf("Error checking index existence: %v", err))
|
||||
}
|
||||
@@ -436,10 +436,10 @@ func (s *SkillIndexerService) ReindexAll(ctx context.Context, tenantID, spaceID
|
||||
|
||||
// Delete existing index and recreate with new dimension (for both ES and Infinity)
|
||||
indexName := getSkillIndexName(tenantID, spaceID)
|
||||
exists, _ := docEngine.TableExists(ctx, indexName)
|
||||
exists, _ := docEngine.ChunkStoreExists(ctx, indexName, "skill")
|
||||
if exists {
|
||||
common.Info(fmt.Sprintf("ReindexAll: deleting existing index %s", indexName))
|
||||
if err := docEngine.DropTable(ctx, indexName); err != nil {
|
||||
if err := docEngine.DropChunkStore(ctx, indexName, "skill"); err != nil {
|
||||
common.Warn(fmt.Sprintf("ReindexAll: failed to delete existing index: %v", err))
|
||||
}
|
||||
}
|
||||
@@ -846,7 +846,7 @@ func (s *SkillIndexerService) InitializeIndex(ctx context.Context, tenantID, spa
|
||||
|
||||
common.Info("Checking skill index existence", zap.String("indexName", indexName), zap.String("tenantID", tenantID), zap.String("spaceID", spaceID))
|
||||
|
||||
exists, err := docEngine.TableExists(ctx, indexName)
|
||||
exists, err := docEngine.ChunkStoreExists(ctx, indexName, "skill")
|
||||
if err != nil {
|
||||
common.Error("Failed to check index existence", err)
|
||||
return fmt.Errorf("failed to check index existence: %w", err)
|
||||
@@ -883,22 +883,22 @@ func (s *SkillIndexerService) createIndexWithDimension(ctx context.Context, tena
|
||||
|
||||
// For Infinity: check if table exists and needs recreation (dimension mismatch)
|
||||
if docEngine.GetType() == "infinity" {
|
||||
exists, err := docEngine.TableExists(ctx, indexName)
|
||||
exists, err := docEngine.ChunkStoreExists(ctx, indexName, "skill")
|
||||
if err != nil {
|
||||
common.Warn(fmt.Sprintf("Error checking if index exists: %v", err))
|
||||
}
|
||||
if exists {
|
||||
common.Info(fmt.Sprintf("Index exists, deleting for recreation with dimension %d", dimension),
|
||||
zap.String("indexName", indexName))
|
||||
if err := docEngine.DropTable(ctx, indexName); err != nil {
|
||||
if err := docEngine.DropChunkStore(ctx, indexName, "skill"); err != nil {
|
||||
common.Warn(fmt.Sprintf("Failed to delete existing index: %v", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Use the doc engine's CreateDataset method with skill-specific mapping
|
||||
// Use the doc engine's CreateChunkStore method with skill-specific mapping
|
||||
// The mapping file is loaded from conf/skill_es_mapping.json or conf/skill_infinity_mapping.json
|
||||
err := docEngine.CreateDataset(ctx, indexName, "skill", dimension, "")
|
||||
err := docEngine.CreateChunkStore(ctx, indexName, "skill", dimension, "")
|
||||
if err != nil {
|
||||
common.Error("Failed to create skill index", err)
|
||||
return err
|
||||
|
||||
@@ -226,7 +226,7 @@ func (s *SkillSearchService) Search(ctx context.Context, req *SearchRequest, doc
|
||||
indexName := getSkillIndexName(req.TenantID, req.SpaceID)
|
||||
common.Debug("Searching skills", zap.String("indexName", indexName), zap.String("query", req.Query))
|
||||
|
||||
indexExists, err := docEngine.TableExists(ctx, indexName)
|
||||
indexExists, err := docEngine.ChunkStoreExists(ctx, indexName, "skill")
|
||||
if err != nil {
|
||||
common.Error("Failed to check index existence", err)
|
||||
return nil, common.CodeOperatingError, fmt.Errorf("failed to check index existence: %w", err)
|
||||
|
||||
@@ -523,7 +523,7 @@ func (s *SkillSpaceService) asyncDeleteSpace(spaceID, folderID, tenantID string,
|
||||
indexName := getSkillIndexName(tenantID, spaceID)
|
||||
common.Info("Async deleting space index", zap.String("index", indexName), zap.String("spaceID", spaceID))
|
||||
deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
if err := docEngine.DropTable(deleteCtx, indexName); err != nil {
|
||||
if err := docEngine.DropChunkStore(deleteCtx, indexName, "skill"); err != nil {
|
||||
common.Warn("Failed to delete space index during async delete", zap.String("index", indexName), zap.Error(err))
|
||||
// Continue with other cleanup steps
|
||||
} else {
|
||||
|
||||
@@ -267,11 +267,8 @@ func (s *TenantService) GetTenantList(userID string) ([]*TenantListItem, error)
|
||||
|
||||
// CreateMetadataInDocEngine creates the document metadata table for a tenant
|
||||
func (s *TenantService) CreateMetadataInDocEngine(tenantID string) (common.ErrorCode, error) {
|
||||
// Build table name: ragflow_doc_meta_<tenant_id>
|
||||
tableName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID)
|
||||
|
||||
// Call document engine to create doc meta table
|
||||
err := s.docEngine.CreateMetadata(context.Background(), tableName)
|
||||
err := s.docEngine.CreateMetadataStore(context.Background(), tenantID)
|
||||
if err != nil {
|
||||
return common.CodeServerError, fmt.Errorf("failed to create metadata table: %w", err)
|
||||
}
|
||||
@@ -281,11 +278,8 @@ func (s *TenantService) CreateMetadataInDocEngine(tenantID string) (common.Error
|
||||
|
||||
// DeleteMetadataInDocEngine deletes the document metadata table for a tenant
|
||||
func (s *TenantService) DeleteMetadataInDocEngine(tenantID string) (common.ErrorCode, error) {
|
||||
// Build table name: ragflow_doc_meta_<tenant_id>
|
||||
tableName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID)
|
||||
|
||||
// Call document engine to delete doc meta table
|
||||
err := s.docEngine.DropTable(context.Background(), tableName)
|
||||
err := s.docEngine.DropMetadataStore(context.Background(), tenantID)
|
||||
if err != nil {
|
||||
return common.CodeServerError, fmt.Errorf("failed to delete doc meta table: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user