Files
maoyifeng 643cb4788f Go CLI: add response output (#16263)
### What problem does this PR solve?

Go CLI: add response output
2026-06-23 18:12:15 +08:00

414 lines
13 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"ragflow/internal/server"
"ragflow/internal/utility"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
// elasticsearchEngine is the Elasticsearch engine implementation
type elasticsearchEngine struct {
client *elasticsearch.Client
config *server.ElasticsearchConfig
}
// NewEngine creates an Elasticsearch engine
func NewEngine(cfg interface{}) (*elasticsearchEngine, error) {
if cfg == nil {
return nil, fmt.Errorf("elasticsearch config is nil, please check your configuration file for 'doc_engine.es' settings")
}
esConfig, ok := cfg.(*server.ElasticsearchConfig)
if !ok {
return nil, fmt.Errorf("invalid Elasticsearch config type, expected *config.ElasticsearchConfig")
}
if esConfig == nil {
return nil, fmt.Errorf("elasticsearch config is nil, please check your configuration file for 'doc_engine.es' settings")
}
// Create ES client
client, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{esConfig.Hosts},
Username: esConfig.Username,
Password: esConfig.Password,
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 30 * time.Second,
},
})
if err != nil {
return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err)
}
// Check connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req := esapi.InfoRequest{}
res, err := req.Do(ctx, client)
if err != nil {
return nil, fmt.Errorf("failed to ping Elasticsearch: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("Elasticsearch returned error: %s", res.Status())
}
engine := &elasticsearchEngine{
client: client,
config: esConfig,
}
// Create two index templates for different index types
// Template for chunk indices (ragflow_*) - priority 1
if err := engine.CreateIndexTemplate(context.Background(), "ragflow_mapping", "ragflow_*", "mapping.json", 1); err != nil {
return nil, fmt.Errorf("failed to create chunk index template: %w", err)
}
// Template for doc_meta indices (ragflow_doc_meta_*) - priority 2 (higher than ragflow_*)
if err := engine.CreateIndexTemplate(context.Background(), "ragflow_doc_meta_mapping", "ragflow_doc_meta_*", "doc_meta_es_mapping.json", 2); err != nil {
return nil, fmt.Errorf("failed to create doc_meta index template: %w", err)
}
return engine, nil
}
// GetType returns the engine type
func (e *elasticsearchEngine) GetType() string {
return "elasticsearch"
}
// Ping health check
func (e *elasticsearchEngine) Ping(ctx context.Context) error {
req := esapi.InfoRequest{}
res, err := req.Do(ctx, e.client)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("elasticsearch ping failed: %s", res.Status())
}
return nil
}
// Close closes the connection
func (e *elasticsearchEngine) Close() error {
// Go-elasticsearch client doesn't have a Close method, connection is managed by the transport
return nil
}
// CreateIndexTemplate creates an index template with the specified mapping
// The template will be automatically applied to any new index matching the pattern
func (e *elasticsearchEngine) CreateIndexTemplate(ctx context.Context, templateName, indexPattern, mappingFileName string, priority ...int) error {
if templateName == "" || indexPattern == "" {
return fmt.Errorf("template name and index pattern cannot be empty")
}
p := 1
if len(priority) > 0 {
p = priority[0]
}
if mappingFileName == "" {
mappingFileName = "mapping.json"
}
// Read mapping from file
mappingPath := filepath.Join(utility.GetProjectRoot(), "conf", mappingFileName)
data, err := os.ReadFile(mappingPath)
if err != nil {
return fmt.Errorf("failed to read mapping file: %w", err)
}
var mapping map[string]interface{}
if err := json.Unmarshal(data, &mapping); err != nil {
return fmt.Errorf("failed to parse mapping file: %w", err)
}
// Separate settings and mappings from the mapping file
templateSettings := mapping["settings"]
templateMappings := mapping["mappings"]
// Build template body with proper structure
templateBody := map[string]interface{}{
"index_patterns": []string{indexPattern},
"priority": p, // Configurable priority to override existing templates
"template": map[string]interface{}{
"settings": templateSettings,
"mappings": templateMappings,
},
}
templateBytes, err := json.Marshal(templateBody)
if err != nil {
return fmt.Errorf("failed to marshal template: %w", err)
}
// Create or update template
req := esapi.IndicesPutIndexTemplateRequest{
Name: templateName,
Body: bytes.NewReader(templateBytes),
}
res, err := req.Do(ctx, e.client)
if err != nil {
return fmt.Errorf("failed to create index template: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("failed to create index template: %s, body: %s", res.Status(), string(bodyBytes))
}
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return fmt.Errorf("failed to parse response: %w", err)
}
if acknowledged, ok := result["acknowledged"].(bool); !ok || !acknowledged {
return fmt.Errorf("index template creation not acknowledged")
}
return nil
}
// GetClusterStats gets Elasticsearch cluster statistics
// Reference: curl -XGET "http://{es_host}/_cluster/stats" -H "kbn-xsrf: reporting"
func (e *elasticsearchEngine) GetClusterStats() (map[string]interface{}, error) {
req := esapi.ClusterStatsRequest{}
res, err := req.Do(context.Background(), e.client)
if err != nil {
return nil, fmt.Errorf("failed to get cluster stats: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("elasticsearch cluster stats returned error: %s", res.Status())
}
var rawStats map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&rawStats); err != nil {
return nil, fmt.Errorf("failed to decode cluster stats: %w", err)
}
result := make(map[string]interface{})
// Basic cluster info
if clusterName, ok := rawStats["cluster_name"].(string); ok {
result["cluster_name"] = clusterName
}
if status, ok := rawStats["status"].(string); ok {
result["status"] = status
}
// Indices info
if indices, ok := rawStats["indices"].(map[string]interface{}); ok {
if count, ok := indices["count"].(float64); ok {
result["indices"] = int(count)
}
if shards, ok := indices["shards"].(map[string]interface{}); ok {
if total, ok := shards["total"].(float64); ok {
result["indices_shards"] = int(total)
}
}
if docs, ok := indices["docs"].(map[string]interface{}); ok {
if docCount, ok := docs["count"].(float64); ok {
result["docs"] = int64(docCount)
}
if deleted, ok := docs["deleted"].(float64); ok {
result["docs_deleted"] = int64(deleted)
}
}
if store, ok := indices["store"].(map[string]interface{}); ok {
if sizeInBytes, ok := store["size_in_bytes"].(float64); ok {
result["store_size"] = convertBytes(int64(sizeInBytes))
}
if totalDataSetSize, ok := store["total_data_set_size_in_bytes"].(float64); ok {
result["total_dataset_size"] = convertBytes(int64(totalDataSetSize))
}
}
if mappings, ok := indices["mappings"].(map[string]interface{}); ok {
if fieldCount, ok := mappings["total_field_count"].(float64); ok {
result["mappings_fields"] = int(fieldCount)
}
if dedupFieldCount, ok := mappings["total_deduplicated_field_count"].(float64); ok {
result["mappings_deduplicated_fields"] = int(dedupFieldCount)
}
if dedupSize, ok := mappings["total_deduplicated_mapping_size_in_bytes"].(float64); ok {
result["mappings_deduplicated_size"] = convertBytes(int64(dedupSize))
}
}
}
// Nodes info
if nodes, ok := rawStats["nodes"].(map[string]interface{}); ok {
if count, ok := nodes["count"].(map[string]interface{}); ok {
if total, ok := count["total"].(float64); ok {
result["nodes"] = int(total)
}
}
if versions, ok := nodes["versions"].([]interface{}); ok {
result["nodes_version"] = versions
}
if os, ok := nodes["os"].(map[string]interface{}); ok {
if mem, ok := os["mem"].(map[string]interface{}); ok {
if totalInBytes, ok := mem["total_in_bytes"].(float64); ok {
result["os_mem"] = convertBytes(int64(totalInBytes))
}
if usedInBytes, ok := mem["used_in_bytes"].(float64); ok {
result["os_mem_used"] = convertBytes(int64(usedInBytes))
}
if usedPercent, ok := mem["used_percent"].(float64); ok {
result["os_mem_used_percent"] = usedPercent
}
}
}
if jvm, ok := nodes["jvm"].(map[string]interface{}); ok {
if versions, ok := jvm["versions"].([]interface{}); ok && len(versions) > 0 {
if version0, ok := versions[0].(map[string]interface{}); ok {
if vmVersion, ok := version0["vm_version"].(string); ok {
result["jvm_versions"] = vmVersion
}
}
}
if mem, ok := jvm["mem"].(map[string]interface{}); ok {
if heapUsed, ok := mem["heap_used_in_bytes"].(float64); ok {
result["jvm_heap_used"] = convertBytes(int64(heapUsed))
}
if heapMax, ok := mem["heap_max_in_bytes"].(float64); ok {
result["jvm_heap_max"] = convertBytes(int64(heapMax))
}
}
}
}
return result, nil
}
// convertBytes converts bytes to human readable format
func convertBytes(bytes int64) string {
const (
KB = 1024
MB = 1024 * KB
GB = 1024 * MB
TB = 1024 * GB
PB = 1024 * TB
)
if bytes >= PB {
return fmt.Sprintf("%.2f pb", float64(bytes)/float64(PB))
}
if bytes >= TB {
return fmt.Sprintf("%.2f tb", float64(bytes)/float64(TB))
}
if bytes >= GB {
return fmt.Sprintf("%.2f gb", float64(bytes)/float64(GB))
}
if bytes >= MB {
return fmt.Sprintf("%.2f mb", float64(bytes)/float64(MB))
}
if bytes >= KB {
return fmt.Sprintf("%.2f kb", float64(bytes)/float64(KB))
}
return fmt.Sprintf("%d b", bytes)
}
// extractErrorReason extracts the error reason from Elasticsearch error response
// It tries to find the most specific error message in the response
func extractErrorReason(bodyBytes []byte) string {
var errResp map[string]interface{}
if err := json.Unmarshal(bodyBytes, &errResp); err != nil {
return ""
}
// Try to get error from root_cause
if errorObj, ok := errResp["error"].(map[string]interface{}); ok {
if rootCauses, ok := errorObj["root_cause"].([]interface{}); ok && len(rootCauses) > 0 {
if rootCause, ok := rootCauses[0].(map[string]interface{}); ok {
if reason, ok := rootCause["reason"].(string); ok && reason != "" {
return reason
}
}
}
// Fallback to main error reason
if reason, ok := errorObj["reason"].(string); ok && reason != "" {
return reason
}
// Try failed_shards
if failedShards, ok := errorObj["failed_shards"].([]interface{}); ok && len(failedShards) > 0 {
if shard, ok := failedShards[0].(map[string]interface{}); ok {
if reason, ok := shard["reason"].(map[string]interface{}); ok {
if r, ok := reason["reason"].(string); ok && r != "" {
return r
}
}
}
}
}
return ""
}
// GetIndexStats gets statistics for specified indices using the _cat/indices API
// Returns index, health, status, docs.count, store.size, dataset.size for each index
func (e *elasticsearchEngine) GetIndexStats(indices []string) ([]map[string]interface{}, error) {
if len(indices) == 0 {
return []map[string]interface{}{}, nil
}
req := esapi.CatIndicesRequest{
Index: indices,
Format: "json",
H: []string{"index", "health", "status", "docs.count", "store.size", "dataset.size"},
}
res, err := req.Do(context.Background(), e.client)
if err != nil {
return nil, fmt.Errorf("failed to get index stats: %w", err)
}
defer res.Body.Close()
if res.IsError() {
if res.StatusCode == 404 {
return []map[string]interface{}{}, nil
}
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("elasticsearch cat indices error: %s, body: %s", res.Status(), string(bodyBytes))
}
var results []map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&results); err != nil {
return nil, fmt.Errorf("failed to decode index stats: %w", err)
}
return results, nil
}