diff --git a/api/apps/services/dataset_api_service.py b/api/apps/services/dataset_api_service.py index 3994316123..6cbc9770d7 100644 --- a/api/apps/services/dataset_api_service.py +++ b/api/apps/services/dataset_api_service.py @@ -1058,9 +1058,8 @@ async def search(dataset_id: str, tenant_id: str, req: dict): ranks["chunks"].insert(0, ck) except Exception: logging.warning("search KG retrieval failed: dataset=%s tenant=%s", dataset_id, tenant_id, exc_info=True) - total = ranks.get("total", 0) ranks["chunks"] = settings.retriever.retrieval_by_children(ranks["chunks"], tenant_ids) - ranks["total"] = total + ranks["total"] = len(ranks["chunks"]) for c in ranks["chunks"]: c.pop("vector", None) @@ -1430,9 +1429,8 @@ async def search_datasets(tenant_id: str, req: dict): ranks["chunks"].insert(0, ck) except Exception: logging.warning("search_datasets KG retrieval failed: datasets=%s tenant=%s", kb_ids, tenant_id, exc_info=True) - total = ranks.get("total", 0) ranks["chunks"] = settings.retriever.retrieval_by_children(ranks["chunks"], tenant_ids) - ranks["total"] = total + ranks["total"] = len(ranks["chunks"]) for c in ranks["chunks"]: c.pop("vector", None) diff --git a/api/db/services/doc_metadata_service.py b/api/db/services/doc_metadata_service.py index fbe32f9e5b..ccc66925bd 100644 --- a/api/db/services/doc_metadata_service.py +++ b/api/db/services/doc_metadata_service.py @@ -24,7 +24,7 @@ import json import logging import re from copy import deepcopy -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from api.db.db_models import DB, Document from common import settings @@ -33,6 +33,36 @@ from api.db.db_models import Knowledgebase from common.doc_store.doc_store_base import OrderByExpr +def _es_response_total(response: Any) -> Optional[int]: + """Extract the exact total hit count from an ES search response. + + Returns ``None`` when the field is missing or in an unexpected shape + — callers should treat that as "cannot verify" rather than "no + overflow". + """ + if not isinstance(response, dict): + try: + response = dict(response) + except Exception: + return None + hits = response.get("hits") + if not isinstance(hits, dict): + return None + total = hits.get("total") + if isinstance(total, dict): + value = total.get("value") + if isinstance(value, bool): + return None + if isinstance(value, int): + return value + if isinstance(value, float): + return int(value) + elif isinstance(total, int): + # Legacy shape: some clients return the count directly. + return total + return None + + class DocMetadataService: """Service for managing document metadata in ES/Infinity""" @@ -876,6 +906,10 @@ class DocMetadataService: **query_body, "size": limit, "_source": ["id"], + # Make hits.total.value exact. ES otherwise caps the tracked + # total at 10,000 with relation="gte", which would let + # overflow slip through undetected. + "track_total_hits": True, } try: @@ -898,6 +932,22 @@ class DocMetadataService: f"ES metadata filter hit limit {limit} for KBs {kb_ids}" ) + # Detect silent truncation: the push-down is a fast path, not + # the system of record. When the query matched more than + # ``limit`` docs, the slice we built here is necessarily a + # strict subset of the truth, and the caller treats any + # non-None result as definitive. Bail out and let the caller + # fall back to the in-memory ``meta_filter`` (correct, just + # slower for very large result sets) instead of silently + # dropping docs. + total = _es_response_total(response) + if total is not None and total > limit: + logging.warning( + f"ES metadata filter result exceeds push-down cap, falling back to in-memory: " + f"total={total}, cap={limit}, kb_ids={kb_ids}" + ) + return None + logging.debug(f"ES metadata filter returned {len(unique)} matches for KBs {kb_ids}") return unique diff --git a/cmd/admin_server.go b/cmd/admin_server.go index 46c695fa60..940606ab93 100644 --- a/cmd/admin_server.go +++ b/cmd/admin_server.go @@ -45,7 +45,7 @@ func main() { flag.Parse() // Initialize logger - if err := common.Init("info"); err != nil { + if err := common.Init("info", "admin_server.log"); err != nil { panic("failed to initialize logger: " + err.Error()) } @@ -59,7 +59,7 @@ func main() { // Reinitialize logger with configured level if different if cfg.Log.Level != "" && cfg.Log.Level != "info" { - if err := common.Init(cfg.Log.Level); err != nil { + if err := common.Init(cfg.Log.Level, "admin_server.log"); err != nil { common.Error("Failed to reinitialize logger with configured level", err) } } diff --git a/cmd/ingestion_server.go b/cmd/ingestion_server.go index 94a45f129f..a46a70e40f 100644 --- a/cmd/ingestion_server.go +++ b/cmd/ingestion_server.go @@ -72,7 +72,7 @@ func main() { flag.Parse() // Initialize logger with default level - if err := common.Init("info"); err != nil { + if err := common.Init("info", "ingestion_server.log"); err != nil { panic(fmt.Sprintf("Failed to initialize logger: %v", err)) } @@ -96,10 +96,12 @@ func main() { } // Reinitialize logger with configured level if different - if config.Log.Level != "" && config.Log.Level != "info" { - if err := common.Init(config.Log.Level); err != nil { - common.Error("Failed to reinitialize logger with configured level", err) - } + level := config.Log.Level + if level == "" { + level = "info" + } + if err := common.Init(level, "ingestion_server.log"); err != nil { + common.Error("Failed to reinitialize logger", err) } server.SetLogger(common.Logger) diff --git a/cmd/ragflow_cli.go b/cmd/ragflow_cli.go index d8303ca9b0..f4e33c1e7c 100644 --- a/cmd/ragflow_cli.go +++ b/cmd/ragflow_cli.go @@ -39,7 +39,7 @@ func main() { if args.Verbose { logLevel = "info" } - if err = common.Init(logLevel); err != nil { + if err = common.Init(logLevel, ""); err != nil { fmt.Printf("Warning: Failed to initialize logger: %v\n", err) } diff --git a/cmd/server_main.go b/cmd/server_main.go index c7a49b507a..f1b656d4d3 100644 --- a/cmd/server_main.go +++ b/cmd/server_main.go @@ -71,7 +71,7 @@ func main() { // Initialize logger with default level // logger.Init("info"); // set debug log level - if err := common.Init("info"); err != nil { + if err := common.Init("info", "server_main.log"); err != nil { panic(fmt.Sprintf("Failed to initialize logger: %v", err)) } @@ -92,10 +92,12 @@ func main() { } // Reinitialize logger with configured level if different - if config.Log.Level != "" && config.Log.Level != "info" { - if err := common.Init(config.Log.Level); err != nil { - common.Error("Failed to reinitialize logger with configured level", err) - } + level := config.Log.Level + if level == "" { + level = "info" + } + if err := common.Init(level, "server_main.log"); err != nil { + common.Error("Failed to reinitialize logger", err) } server.SetLogger(common.Logger) if config.Log.Level == "" { diff --git a/common/doc_store/infinity_conn_base.py b/common/doc_store/infinity_conn_base.py index af8493b82b..169bd85e87 100644 --- a/common/doc_store/infinity_conn_base.py +++ b/common/doc_store/infinity_conn_base.py @@ -546,6 +546,17 @@ class InfinityConnectionBase(DocStoreConnection): except Exception as e: self.logger.warning(f"Failed to create index on kb_id for {table_name}: {e}") + # Create secondary index on meta_fields for metadata filter queries + try: + inf_table.create_index( + f"idx_{table_name}_meta_fields", + IndexInfo("meta_fields", IndexType.Secondary), + ConflictType.Ignore, + ) + self.logger.debug(f"INFINITY created secondary index on meta_fields for table {table_name}") + except Exception as e: + self.logger.warning(f"Failed to create index on meta_fields for {table_name}: {e}") + self.logger.debug(f"INFINITY created document metadata table {table_name} with secondary indexes") return True diff --git a/go.mod b/go.mod index 5d748842a6..54ce2bee7a 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/google/uuid v1.6.0 github.com/infiniflow/infinity-go-sdk v0.0.0-00010101000000-000000000000 github.com/iromli/go-itsdangerous v0.0.0-20220223194502-9c8bef8dac6a + github.com/json-iterator/go v1.1.12 github.com/minio/minio-go/v7 v7.0.99 github.com/peterh/liner v1.2.2 github.com/redis/go-redis/v9 v9.18.0 @@ -78,7 +79,6 @@ require ( github.com/hashicorp/hcl v1.0.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.18.2 // indirect github.com/klauspost/cpuid/v2 v2.2.11 // indirect github.com/klauspost/crc32 v1.3.0 // indirect diff --git a/internal/cli/README.md b/internal/cli/README.md index f55dc21e14..0dcb8ccf30 100644 --- a/internal/cli/README.md +++ b/internal/cli/README.md @@ -103,6 +103,9 @@ search "machine learning" # Search all datasets (JSON output) search "neural networks" datasets/kb1 # Search in kb1 search "AI" datasets/kb1 --output plain # Plain text output search "RAG" -n 20 # Return 20 results +SEARCH 'machine learning' ON DATASETS 'kb1' 'kb2' +SEARCH 'AI' ON DATASETS 'kb1' WITH top_k 1024 similarity_threshold 0.0 vector_similarity_weight 0.3 keyword true +SEARCH 'AI' ON DATASETS 'kb1' WITH cross_languages ['Chinese'] ``` #### `cat ` - Display content diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 770ca98341..42a9b37910 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -421,13 +421,13 @@ const historyFileName = ".ragflow_cli_history" // CLI represents the command line interface type CLI struct { - client *RAGFlowClient - contextEngine *filesystem.Engine - prompt string - running bool - line *liner.State - args *ConnectionArgs - outputFormat OutputFormat // Output format + client *RAGFlowClient + contextEngine *filesystem.Engine + prompt string + running bool + line *liner.State + args *ConnectionArgs + outputFormat OutputFormat // Output format } // NewCLI creates a new CLI instance @@ -1072,13 +1072,13 @@ func (c *CLI) printSkillSearchResults(result *filesystem.Result, format OutputFo // Skill search result structure type skillSearchResult struct { - SkillID string `json:"skill_id"` - Name string `json:"name"` - Description string `json:"description"` - Tags string `json:"tags"` - Score float64 `json:"score"` - BM25Score float64 `json:"bm25_score"` - VectorScore float64 `json:"vector_score"` + SkillID string `json:"skill_id"` + Name string `json:"name"` + Description string `json:"description"` + Tags string `json:"tags"` + Score float64 `json:"score"` + BM25Score float64 `json:"bm25_score"` + VectorScore float64 `json:"vector_score"` } results := make([]skillSearchResult, 0, len(result.Nodes)) @@ -1457,6 +1457,41 @@ Examples: search "AI" datasets/kb1 # Search in kb1 search "RAG" skills/space1 -n 20 # Search skills in hub1, return 20 results search "data processing" skills # Search skills (default space) + +Datasets syntax (full filter set): + search 'query' on datasets 'kb_names' [with