From c960dc2a4c541a784957226aa3cb2cdf138837cf Mon Sep 17 00:00:00 2001 From: qinling0210 <88864212+qinling0210@users.noreply.github.com> Date: Mon, 8 Jun 2026 11:49:37 +0800 Subject: [PATCH] Refine handling of POST /api/v1/datasets/search in GO (#15583) ### What problem does this PR solve? Refine handling of POST /api/v1/datasets/search in GO ### Type of change - [x] Refactoring --- api/apps/services/dataset_api_service.py | 6 +- api/db/services/doc_metadata_service.py | 52 +- cmd/admin_server.go | 4 +- cmd/ingestion_server.go | 12 +- cmd/ragflow_cli.go | 2 +- cmd/server_main.go | 12 +- common/doc_store/infinity_conn_base.py | 11 + go.mod | 2 +- internal/cli/README.md | 3 + internal/cli/cli.go | 63 +- internal/cli/client.go | 7 +- internal/cli/lexer.go | 6 + internal/cli/parser.go | 65 +- internal/cli/types.go | 2 + internal/cli/user_command.go | 66 +- internal/cli/user_parser.go | 157 +- internal/common/constants.go | 5 + internal/common/float.go | 135 +- internal/common/libm_cgo.go | 47 + internal/common/libm_purego.go | 45 + internal/common/logger.go | 16 +- internal/cpp/darts_trie.cpp | 1 + internal/dao/kb.go | 51 +- internal/dao/tenant.go | 2 +- internal/dao/tenant_model.go | 18 + internal/development.md | 41 +- internal/engine/elasticsearch/chunk.go | 1770 ++++++++++------- internal/engine/elasticsearch/chunk_test.go | 433 ++++ internal/engine/elasticsearch/client.go | 2 +- internal/engine/elasticsearch/meta_filter.go | 682 +++++++ .../engine/elasticsearch/meta_filter_test.go | 213 ++ internal/engine/elasticsearch/metadata.go | 361 +++- internal/engine/engine.go | 11 +- internal/engine/infinity/chunk.go | 613 ++++-- internal/engine/infinity/common.go | 11 + internal/engine/infinity/meta_filter.go | 582 ++++++ internal/engine/infinity/meta_filter_test.go | 290 +++ internal/engine/infinity/metadata.go | 352 +++- internal/engine/types/types.go | 57 + internal/entity/models/builtin.go | 192 ++ internal/entity/tenant.go | 2 +- internal/handler/chunk.go | 95 +- internal/handler/datasets.go | 80 + internal/handler/searchbot.go | 3 +- internal/router/router.go | 2 +- internal/service/chat_session.go | 34 +- internal/service/chunk.go | 563 +----- internal/service/dataset.go | 426 +++- internal/service/document.go | 40 +- internal/service/generator.go | 46 +- internal/service/load_prompt.go | 24 +- internal/service/load_prompt_test.go | 180 ++ internal/service/metadata.go | 72 +- internal/service/metadata_filter.go | 446 ++++- internal/service/metadata_filter_test.go | 135 ++ internal/service/model_service.go | 353 +++- internal/service/nlp/query_builder.go | 34 +- internal/service/nlp/query_builder_test.go | 26 - internal/service/nlp/reranker.go | 441 +++- internal/service/nlp/retrieval.go | 269 ++- internal/service/nlp/synonym.go | 4 + internal/service/nlp/term_weight.go | 45 +- internal/service/search.go | 4 +- internal/service/tenant.go | 14 +- .../tokenizer/tokenizer_concurrent_test.go | 2 +- rag/nlp/search.py | 4 +- rag/prompts/generator.py | 6 +- test/testcases/configs.py | 4 + test/testcases/conftest.py | 147 +- .../test_search_datasets_consistency.py | 594 ++++++ 70 files changed, 8580 insertions(+), 1915 deletions(-) create mode 100644 internal/common/libm_cgo.go create mode 100644 internal/common/libm_purego.go create mode 100644 internal/engine/elasticsearch/chunk_test.go create mode 100644 internal/engine/elasticsearch/meta_filter.go create mode 100644 internal/engine/elasticsearch/meta_filter_test.go create mode 100644 internal/engine/infinity/meta_filter.go create mode 100644 internal/engine/infinity/meta_filter_test.go create mode 100644 internal/entity/models/builtin.go create mode 100644 internal/service/load_prompt_test.go create mode 100644 test/testcases/restful_api/test_search_datasets_consistency.py diff --git a/api/apps/services/dataset_api_service.py b/api/apps/services/dataset_api_service.py index 3994316123..6cbc9770d7 100644 --- a/api/apps/services/dataset_api_service.py +++ b/api/apps/services/dataset_api_service.py @@ -1058,9 +1058,8 @@ async def search(dataset_id: str, tenant_id: str, req: dict): ranks["chunks"].insert(0, ck) except Exception: logging.warning("search KG retrieval failed: dataset=%s tenant=%s", dataset_id, tenant_id, exc_info=True) - total = ranks.get("total", 0) ranks["chunks"] = settings.retriever.retrieval_by_children(ranks["chunks"], tenant_ids) - ranks["total"] = total + ranks["total"] = len(ranks["chunks"]) for c in ranks["chunks"]: c.pop("vector", None) @@ -1430,9 +1429,8 @@ async def search_datasets(tenant_id: str, req: dict): ranks["chunks"].insert(0, ck) except Exception: logging.warning("search_datasets KG retrieval failed: datasets=%s tenant=%s", kb_ids, tenant_id, exc_info=True) - total = ranks.get("total", 0) ranks["chunks"] = settings.retriever.retrieval_by_children(ranks["chunks"], tenant_ids) - ranks["total"] = total + ranks["total"] = len(ranks["chunks"]) for c in ranks["chunks"]: c.pop("vector", None) diff --git a/api/db/services/doc_metadata_service.py b/api/db/services/doc_metadata_service.py index fbe32f9e5b..ccc66925bd 100644 --- a/api/db/services/doc_metadata_service.py +++ b/api/db/services/doc_metadata_service.py @@ -24,7 +24,7 @@ import json import logging import re from copy import deepcopy -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from api.db.db_models import DB, Document from common import settings @@ -33,6 +33,36 @@ from api.db.db_models import Knowledgebase from common.doc_store.doc_store_base import OrderByExpr +def _es_response_total(response: Any) -> Optional[int]: + """Extract the exact total hit count from an ES search response. + + Returns ``None`` when the field is missing or in an unexpected shape + — callers should treat that as "cannot verify" rather than "no + overflow". + """ + if not isinstance(response, dict): + try: + response = dict(response) + except Exception: + return None + hits = response.get("hits") + if not isinstance(hits, dict): + return None + total = hits.get("total") + if isinstance(total, dict): + value = total.get("value") + if isinstance(value, bool): + return None + if isinstance(value, int): + return value + if isinstance(value, float): + return int(value) + elif isinstance(total, int): + # Legacy shape: some clients return the count directly. + return total + return None + + class DocMetadataService: """Service for managing document metadata in ES/Infinity""" @@ -876,6 +906,10 @@ class DocMetadataService: **query_body, "size": limit, "_source": ["id"], + # Make hits.total.value exact. ES otherwise caps the tracked + # total at 10,000 with relation="gte", which would let + # overflow slip through undetected. + "track_total_hits": True, } try: @@ -898,6 +932,22 @@ class DocMetadataService: f"ES metadata filter hit limit {limit} for KBs {kb_ids}" ) + # Detect silent truncation: the push-down is a fast path, not + # the system of record. When the query matched more than + # ``limit`` docs, the slice we built here is necessarily a + # strict subset of the truth, and the caller treats any + # non-None result as definitive. Bail out and let the caller + # fall back to the in-memory ``meta_filter`` (correct, just + # slower for very large result sets) instead of silently + # dropping docs. + total = _es_response_total(response) + if total is not None and total > limit: + logging.warning( + f"ES metadata filter result exceeds push-down cap, falling back to in-memory: " + f"total={total}, cap={limit}, kb_ids={kb_ids}" + ) + return None + logging.debug(f"ES metadata filter returned {len(unique)} matches for KBs {kb_ids}") return unique diff --git a/cmd/admin_server.go b/cmd/admin_server.go index 46c695fa60..940606ab93 100644 --- a/cmd/admin_server.go +++ b/cmd/admin_server.go @@ -45,7 +45,7 @@ func main() { flag.Parse() // Initialize logger - if err := common.Init("info"); err != nil { + if err := common.Init("info", "admin_server.log"); err != nil { panic("failed to initialize logger: " + err.Error()) } @@ -59,7 +59,7 @@ func main() { // Reinitialize logger with configured level if different if cfg.Log.Level != "" && cfg.Log.Level != "info" { - if err := common.Init(cfg.Log.Level); err != nil { + if err := common.Init(cfg.Log.Level, "admin_server.log"); err != nil { common.Error("Failed to reinitialize logger with configured level", err) } } diff --git a/cmd/ingestion_server.go b/cmd/ingestion_server.go index 94a45f129f..a46a70e40f 100644 --- a/cmd/ingestion_server.go +++ b/cmd/ingestion_server.go @@ -72,7 +72,7 @@ func main() { flag.Parse() // Initialize logger with default level - if err := common.Init("info"); err != nil { + if err := common.Init("info", "ingestion_server.log"); err != nil { panic(fmt.Sprintf("Failed to initialize logger: %v", err)) } @@ -96,10 +96,12 @@ func main() { } // Reinitialize logger with configured level if different - if config.Log.Level != "" && config.Log.Level != "info" { - if err := common.Init(config.Log.Level); err != nil { - common.Error("Failed to reinitialize logger with configured level", err) - } + level := config.Log.Level + if level == "" { + level = "info" + } + if err := common.Init(level, "ingestion_server.log"); err != nil { + common.Error("Failed to reinitialize logger", err) } server.SetLogger(common.Logger) diff --git a/cmd/ragflow_cli.go b/cmd/ragflow_cli.go index d8303ca9b0..f4e33c1e7c 100644 --- a/cmd/ragflow_cli.go +++ b/cmd/ragflow_cli.go @@ -39,7 +39,7 @@ func main() { if args.Verbose { logLevel = "info" } - if err = common.Init(logLevel); err != nil { + if err = common.Init(logLevel, ""); err != nil { fmt.Printf("Warning: Failed to initialize logger: %v\n", err) } diff --git a/cmd/server_main.go b/cmd/server_main.go index c7a49b507a..f1b656d4d3 100644 --- a/cmd/server_main.go +++ b/cmd/server_main.go @@ -71,7 +71,7 @@ func main() { // Initialize logger with default level // logger.Init("info"); // set debug log level - if err := common.Init("info"); err != nil { + if err := common.Init("info", "server_main.log"); err != nil { panic(fmt.Sprintf("Failed to initialize logger: %v", err)) } @@ -92,10 +92,12 @@ func main() { } // Reinitialize logger with configured level if different - if config.Log.Level != "" && config.Log.Level != "info" { - if err := common.Init(config.Log.Level); err != nil { - common.Error("Failed to reinitialize logger with configured level", err) - } + level := config.Log.Level + if level == "" { + level = "info" + } + if err := common.Init(level, "server_main.log"); err != nil { + common.Error("Failed to reinitialize logger", err) } server.SetLogger(common.Logger) if config.Log.Level == "" { diff --git a/common/doc_store/infinity_conn_base.py b/common/doc_store/infinity_conn_base.py index af8493b82b..169bd85e87 100644 --- a/common/doc_store/infinity_conn_base.py +++ b/common/doc_store/infinity_conn_base.py @@ -546,6 +546,17 @@ class InfinityConnectionBase(DocStoreConnection): except Exception as e: self.logger.warning(f"Failed to create index on kb_id for {table_name}: {e}") + # Create secondary index on meta_fields for metadata filter queries + try: + inf_table.create_index( + f"idx_{table_name}_meta_fields", + IndexInfo("meta_fields", IndexType.Secondary), + ConflictType.Ignore, + ) + self.logger.debug(f"INFINITY created secondary index on meta_fields for table {table_name}") + except Exception as e: + self.logger.warning(f"Failed to create index on meta_fields for {table_name}: {e}") + self.logger.debug(f"INFINITY created document metadata table {table_name} with secondary indexes") return True diff --git a/go.mod b/go.mod index 5d748842a6..54ce2bee7a 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/google/uuid v1.6.0 github.com/infiniflow/infinity-go-sdk v0.0.0-00010101000000-000000000000 github.com/iromli/go-itsdangerous v0.0.0-20220223194502-9c8bef8dac6a + github.com/json-iterator/go v1.1.12 github.com/minio/minio-go/v7 v7.0.99 github.com/peterh/liner v1.2.2 github.com/redis/go-redis/v9 v9.18.0 @@ -78,7 +79,6 @@ require ( github.com/hashicorp/hcl v1.0.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.18.2 // indirect github.com/klauspost/cpuid/v2 v2.2.11 // indirect github.com/klauspost/crc32 v1.3.0 // indirect diff --git a/internal/cli/README.md b/internal/cli/README.md index f55dc21e14..0dcb8ccf30 100644 --- a/internal/cli/README.md +++ b/internal/cli/README.md @@ -103,6 +103,9 @@ search "machine learning" # Search all datasets (JSON output) search "neural networks" datasets/kb1 # Search in kb1 search "AI" datasets/kb1 --output plain # Plain text output search "RAG" -n 20 # Return 20 results +SEARCH 'machine learning' ON DATASETS 'kb1' 'kb2' +SEARCH 'AI' ON DATASETS 'kb1' WITH top_k 1024 similarity_threshold 0.0 vector_similarity_weight 0.3 keyword true +SEARCH 'AI' ON DATASETS 'kb1' WITH cross_languages ['Chinese'] ``` #### `cat ` - Display content diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 770ca98341..42a9b37910 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -421,13 +421,13 @@ const historyFileName = ".ragflow_cli_history" // CLI represents the command line interface type CLI struct { - client *RAGFlowClient - contextEngine *filesystem.Engine - prompt string - running bool - line *liner.State - args *ConnectionArgs - outputFormat OutputFormat // Output format + client *RAGFlowClient + contextEngine *filesystem.Engine + prompt string + running bool + line *liner.State + args *ConnectionArgs + outputFormat OutputFormat // Output format } // NewCLI creates a new CLI instance @@ -1072,13 +1072,13 @@ func (c *CLI) printSkillSearchResults(result *filesystem.Result, format OutputFo // Skill search result structure type skillSearchResult struct { - SkillID string `json:"skill_id"` - Name string `json:"name"` - Description string `json:"description"` - Tags string `json:"tags"` - Score float64 `json:"score"` - BM25Score float64 `json:"bm25_score"` - VectorScore float64 `json:"vector_score"` + SkillID string `json:"skill_id"` + Name string `json:"name"` + Description string `json:"description"` + Tags string `json:"tags"` + Score float64 `json:"score"` + BM25Score float64 `json:"bm25_score"` + VectorScore float64 `json:"vector_score"` } results := make([]skillSearchResult, 0, len(result.Nodes)) @@ -1457,6 +1457,41 @@ Examples: search "AI" datasets/kb1 # Search in kb1 search "RAG" skills/space1 -n 20 # Search skills in hub1, return 20 results search "data processing" skills # Search skills (default space) + +Datasets syntax (full filter set): + search 'query' on datasets 'kb_names' [with