From 59c35100c56ab8954f1c4066cac512eb51bbf194 Mon Sep 17 00:00:00 2001 From: sxxtony <166789813+sxxtony@users.noreply.github.com> Date: Thu, 7 May 2026 16:23:43 +0300 Subject: [PATCH] Perf: push metadata filters down to Elasticsearch (#14576) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? Fixes #14412. `common.metadata_utils.meta_filter` evaluates user-defined metadata conditions in Python after `DocMetadataService.get_flatted_meta_by_kbs` loads the entire `meta_fields` table into memory. Past a few thousand documents per knowledge base this becomes a memory bottleneck and a wasted ES round-trip — every filter request currently fetches up to 10000 metadata rows even when the resulting `doc_ids` list is tiny. This PR adds an ES push-down path that translates the same filter language into a `bool` query and returns just the matching document IDs. **Changes** - `common/metadata_es_filter.py` *(new)*: pure-Python translator from the RAGflow filter list to ES DSL. Covers every operator the in-memory path supports (`=`, `≠`, `>`, `<`, `≥`, `≤`, `in`, `not in`, `contains`, `not contains`, `start with`, `end with`, `empty`, `not empty`) with `case_insensitive: true` on `prefix` and `wildcard` for parity with the existing lower-cased Python comparisons. User wildcard metacharacters are escaped before being injected into `wildcard` patterns. Negative operators (`≠`, `not in`, `not contains`, ranges) are wrapped with an `exists` guard so they do not accidentally match documents missing the key, matching the legacy `if k not in metas` behaviour. - `api/db/services/doc_metadata_service.py`: new `DocMetadataService.filter_doc_ids_by_meta_pushdown(kb_ids, filters, logic)` that returns the doc IDs ES matched, or `None` to signal the caller should fall back to the in-memory path. Returns `None` when the active doc store is Infinity (`meta_fields` is a JSON column, not a dotted-object mapping), when any filter cannot be expressed in DSL (`UnsupportedMetaFilter`), or when the ES request or metadata index lookup errors. - `common/metadata_utils.py`: `apply_meta_data_filter` accepts an optional `kb_ids` argument. When supplied, conditions go through push-down first via a new `_try_meta_pushdown` helper; on `None` the function falls back to the original `meta_filter` call. Default behaviour is unchanged for callers that don't pass `kb_ids`. - Updated all four callers (`agent/tools/retrieval.py`, `api/db/services/dialog_service.py` ×2, `api/apps/services/dataset_api_service.py`, `api/apps/sdk/session.py`) to forward `kb_ids` so the push-down path is exercised in production. - `test/unit_test/common/test_metadata_es_filter.py` *(new)*: 35 unit tests covering every operator's DSL shape, value coercion (`ast.literal_eval`, lowercasing, ISO-date pass-through), wildcard escaping, OR-logic wrapping that protects negative clauses, and the doc-ID extractor. **Behaviour preserved** - The in-memory `meta_filter` is untouched and still services every fallback case (Infinity backend, unknown operators, ES outages). - The eligibility / credibility / issue-multiplier semantics described in the LLM-driven `auto` and `semi_auto` modes still hand the LLM the full in-memory `metas` dict to choose conditions from. Only the *evaluation* of those generated conditions is pushed down. - Existing tests in `test/unit_test/common/test_metadata_filter_operators.py` continue to pass (14/14). **Test plan** - `pytest test/unit_test/common/test_metadata_es_filter.py` — 35 passed. - `pytest test/unit_test/common/test_metadata_filter_operators.py` — 14 passed. - `ruff check` clean on every modified file. - Reviewer please validate the ES query shapes against a live cluster — particularly `case_insensitive` on `wildcard` and `prefix` (requires ES 7.10+) and the `exists` + `must_not` pairing for `≠`. **Notes** - The first cut caps each push-down request at 10000 results, matching the existing `get_flatted_meta_by_kbs` limit, and logs a warning when the cap is hit. A `search_after` follow-up would let us drop the cap entirely once the push-down path is validated. - Operator parity with the in-memory path is exact for the canonical unicode operators (`≥`, `≤`, `≠`) used internally; the ASCII aliases (`>=`, `<=`, `!=`) are normalised by `convert_conditions` before they reach the translator. ### Type of change - [x] Performance Improvement --------- Co-authored-by: sxxtony --- agent/tools/retrieval.py | 10 +- api/apps/sdk/session.py | 11 +- api/apps/services/dataset_api_service.py | 11 +- api/db/services/dialog_service.py | 27 +- api/db/services/doc_metadata_service.py | 104 ++++ common/metadata_es_filter.py | 580 ++++++++++++++++++ common/metadata_utils.py | 73 ++- .../test_session_sdk_routes_unit.py | 4 +- .../common/test_metadata_es_filter.py | 473 ++++++++++++++ 9 files changed, 1273 insertions(+), 20 deletions(-) create mode 100644 common/metadata_es_filter.py create mode 100644 test/unit_test/common/test_metadata_es_filter.py diff --git a/agent/tools/retrieval.py b/agent/tools/retrieval.py index 912a5c3485..4496f497ae 100644 --- a/agent/tools/retrieval.py +++ b/agent/tools/retrieval.py @@ -135,7 +135,11 @@ class Retrieval(ToolBase, ABC): doc_ids = [] if self._param.meta_data_filter != {}: - metas = DocMetadataService.get_flatted_meta_by_kbs(kb_ids) + # Defer the (potentially expensive) metadata table load — manual + # filters served by ES push-down never need it. The loader is + # invoked at most once per request by ``apply_meta_data_filter``. + def _load_metas() -> dict: + return DocMetadataService.get_flatted_meta_by_kbs(kb_ids) def _resolve_manual_filter(flt: dict) -> dict: pat = re.compile(self.variable_ref_patt) @@ -174,11 +178,13 @@ class Retrieval(ToolBase, ABC): doc_ids = await apply_meta_data_filter( self._param.meta_data_filter, - metas, + None, query, chat_mdl, doc_ids, _resolve_manual_filter if self._param.meta_data_filter.get("method") == "manual" else None, + kb_ids=kb_ids, + metas_loader=_load_metas, ) if self._param.cross_languages: diff --git a/api/apps/sdk/session.py b/api/apps/sdk/session.py index 2cb4312991..b539272c0d 100644 --- a/api/apps/sdk/session.py +++ b/api/apps/sdk/session.py @@ -370,8 +370,15 @@ async def retrieval_test_embedded(): chat_mdl = LLMBundle(tenant_id, chat_model_config) if meta_data_filter: - metas = DocMetadataService.get_flatted_meta_by_kbs(kb_ids) - local_doc_ids = await apply_meta_data_filter(meta_data_filter, metas, _question, chat_mdl, local_doc_ids) + local_doc_ids = await apply_meta_data_filter( + meta_data_filter, + None, + _question, + chat_mdl, + local_doc_ids, + kb_ids=kb_ids, + metas_loader=lambda: DocMetadataService.get_flatted_meta_by_kbs(kb_ids), + ) tenants = UserTenantService.query(user_id=tenant_id) for kb_id in kb_ids: diff --git a/api/apps/services/dataset_api_service.py b/api/apps/services/dataset_api_service.py index 93512ff09f..16418d83d8 100644 --- a/api/apps/services/dataset_api_service.py +++ b/api/apps/services/dataset_api_service.py @@ -974,8 +974,15 @@ async def search(dataset_id: str, tenant_id: str, req: dict): chat_mdl = LLMBundle(tenant_id, chat_model_config) if meta_data_filter: - metas = DocMetadataService.get_flatted_meta_by_kbs([dataset_id]) - local_doc_ids = await apply_meta_data_filter(meta_data_filter, metas, question, chat_mdl, local_doc_ids) + local_doc_ids = await apply_meta_data_filter( + meta_data_filter, + None, + question, + chat_mdl, + local_doc_ids, + kb_ids=[dataset_id], + metas_loader=lambda: DocMetadataService.get_flatted_meta_by_kbs([dataset_id]), + ) tenant_ids = [] tenants = UserTenantService.query(user_id=tenant_id) diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py index 09ca70c43a..04e9c691b3 100644 --- a/api/db/services/dialog_service.py +++ b/api/db/services/dialog_service.py @@ -616,13 +616,14 @@ async def async_chat(dialog, messages, stream=True, **kwargs): questions = [await cross_languages(dialog.tenant_id, dialog.llm_id, questions[0], prompt_config["cross_languages"])] if dialog.meta_data_filter: - metas = DocMetadataService.get_flatted_meta_by_kbs(dialog.kb_ids) attachments = await apply_meta_data_filter( dialog.meta_data_filter, - metas, + None, questions[-1], chat_mdl, attachments, + kb_ids=dialog.kb_ids, + metas_loader=lambda: DocMetadataService.get_flatted_meta_by_kbs(dialog.kb_ids), ) if prompt_config.get("keyword", False): @@ -1502,8 +1503,15 @@ async def async_ask(question, kb_ids, tenant_id, chat_llm_name=None, search_conf tenant_ids = list(set([kb.tenant_id for kb in kbs])) if meta_data_filter: - metas = DocMetadataService.get_flatted_meta_by_kbs(kb_ids) - doc_ids = await apply_meta_data_filter(meta_data_filter, metas, question, chat_mdl, doc_ids) + doc_ids = await apply_meta_data_filter( + meta_data_filter, + None, + question, + chat_mdl, + doc_ids, + kb_ids=kb_ids, + metas_loader=lambda: DocMetadataService.get_flatted_meta_by_kbs(kb_ids), + ) kbinfos = await retriever.retrieval( question=question, @@ -1595,8 +1603,15 @@ async def gen_mindmap(question, kb_ids, tenant_id, search_config={}): rerank_mdl = LLMBundle(tenant_id, rerank_model_config) if meta_data_filter: - metas = DocMetadataService.get_flatted_meta_by_kbs(kb_ids) - doc_ids = await apply_meta_data_filter(meta_data_filter, metas, question, chat_mdl, doc_ids) + doc_ids = await apply_meta_data_filter( + meta_data_filter, + None, + question, + chat_mdl, + doc_ids, + kb_ids=kb_ids, + metas_loader=lambda: DocMetadataService.get_flatted_meta_by_kbs(kb_ids), + ) ranks = await settings.retriever.retrieval( question=question, diff --git a/api/db/services/doc_metadata_service.py b/api/db/services/doc_metadata_service.py index db05f4bb2d..1cf887c2d3 100644 --- a/api/db/services/doc_metadata_service.py +++ b/api/db/services/doc_metadata_service.py @@ -772,6 +772,110 @@ class DocMetadataService: logging.error(f"Error getting flattened metadata for KBs {kb_ids}: {e}") return {} + @classmethod + def filter_doc_ids_by_meta_pushdown( + cls, + kb_ids: List[str], + filters: List[Dict], + logic: str = "and", + limit: int = 10000, + ) -> Optional[List[str]]: + """Run a metadata filter directly against ES, returning matching doc IDs. + + Returns ``None`` to signal "push-down not viable, use the in-memory + ``meta_filter`` fallback". Reasons for ``None``: + + - Active doc store is not Elasticsearch (Infinity / OceanBase have + different filter semantics for the JSON ``meta_fields`` column). + - One of the user filters cannot be expressed in ES DSL. + - The ES request itself failed (network, mapping, missing index). + + On success returns the deduplicated, ordered list of document IDs the + ES query matched. Callers can union or intersect this with their own + base ``doc_ids`` rather than fetching the entire metadata table. + """ + from common.metadata_es_filter import ( + UnsupportedMetaFilter, + build_meta_filter_query, + extract_doc_ids, + is_pushdown_supported, + ) + + if not kb_ids: + return [] + + if settings.DOC_ENGINE_INFINITY: + # Infinity stores ``meta_fields`` as a JSON column without dotted + # field access; the in-memory path is still the reliable answer. + return None + + es_client = getattr(settings.docStoreConn, "es", None) + if es_client is None: + return None + + if not is_pushdown_supported(filters): + return None + + try: + kb = Knowledgebase.get_by_id(kb_ids[0]) + except Exception as e: + logging.warning(f"[meta_pushdown] cannot resolve tenant for kb {kb_ids[0]}: {e}") + return None + if not kb: + return None + + tenant_id = kb.tenant_id + index_name = cls._get_doc_meta_index_name(tenant_id) + + try: + if not settings.docStoreConn.index_exist(index_name, ""): + # No metadata index → no metadata-filtered docs. Returning an + # empty list (rather than ``None``) so callers don't bounce + # back to the in-memory path and re-query MySQL for nothing. + return [] + except Exception as e: + logging.warning(f"[meta_pushdown] index_exist check failed for {index_name}: {e}") + return None + + try: + query_body = build_meta_filter_query(filters, logic, kb_ids) + except UnsupportedMetaFilter as e: + logging.debug(f"[meta_pushdown] falling back to in-memory: {e.reason}") + return None + + # Only the doc id is needed downstream; trimming ``_source`` keeps the + # response small when the metadata blob is large. + request_body = { + **query_body, + "size": limit, + "_source": ["id"], + } + + try: + response = es_client.search(index=index_name, body=request_body) + except Exception as e: + logging.warning(f"[meta_pushdown] ES query failed for {index_name}: {e}") + return None + + doc_ids = extract_doc_ids(response if isinstance(response, dict) else dict(response)) + # Preserve order while removing duplicates so caller-side de-dupe stays + # cheap. + seen: set[str] = set() + unique: List[str] = [] + for did in doc_ids: + if did in seen: + continue + seen.add(did) + unique.append(did) + + if len(unique) >= limit: + logging.warning( + f"[meta_pushdown] hit limit {limit} for KBs {kb_ids}; some matches may be missing" + ) + + logging.debug(f"[meta_pushdown] {len(unique)} matches for KBs {kb_ids}") + return unique + @classmethod def get_metadata_keys_by_kbs(cls, kb_ids: List[str]) -> List[str]: """ diff --git a/common/metadata_es_filter.py b/common/metadata_es_filter.py new file mode 100644 index 0000000000..afe0f27386 --- /dev/null +++ b/common/metadata_es_filter.py @@ -0,0 +1,580 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Translate RAGflow document-metadata filter lists into Elasticsearch DSL. + +The legacy ``common.metadata_utils.meta_filter`` evaluates user-defined +metadata conditions in Python after loading every document's metadata into +memory. That works for small knowledge bases but degrades badly past a few +thousand documents. This module produces an equivalent ES bool query so the +filtering can be pushed down to the search engine. + +Operators handled here mirror ``meta_filter`` exactly. When a filter cannot be +translated (unknown operator, malformed value, list-typed input that the +in-memory code special-cases) the translator raises +:class:`UnsupportedMetaFilter` so callers fall back to the in-memory path +without silently changing semantics. +""" + +from __future__ import annotations + +import ast +import re +from dataclasses import dataclass, field +from typing import Any, Dict, Iterable, List, Optional, Sequence + +# Field prefix in the doc-metadata ES index. Every user metadata key lives at +# ``meta_fields.`` thanks to the dynamic object mapping in +# ``conf/doc_meta_es_mapping.json``. +META_FIELDS_PREFIX = "meta_fields" + +# Strict ``YYYY-MM-DD`` recogniser, kept consistent with the legacy in-memory +# path. Mismatched-type comparisons (string vs date, list vs scalar) fall back +# to in-memory semantics rather than guess at the right ES coercion. +_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$") + +# Operators that the legacy filter exposes. Anything outside this set is a bug +# elsewhere; surface it instead of silently no-op'ing. +SUPPORTED_OPERATORS: frozenset[str] = frozenset( + { + "=", + "≠", + ">", + "<", + "≥", + "≤", + "in", + "not in", + "contains", + "not contains", + "start with", + "end with", + "empty", + "not empty", + } +) + +# ES range comparators keyed by RAGflow operator. +_RANGE_OPS: Dict[str, str] = { + ">": "gt", + "<": "lt", + "≥": "gte", + "≤": "lte", +} + +# Negative operators that diverge from ``meta_filter`` on multi-valued metadata +# fields. The in-memory path checks each value bucket independently, so a doc +# whose field is ``[a, b]`` matches ``≠ a`` (because the ``b`` bucket satisfies +# the predicate). ``must_not term: a`` in ES would exclude that doc outright. +# Without a cheap way to prove a field is single-valued at query time we refuse +# push-down for these operators and let the in-memory fallback handle them. +# ``not contains`` is not in this set: ``all(not contains)`` is equivalent to +# ``not any(contains)``, so ``must_not wildcard *X*`` matches the legacy +# semantics on both single- and multi-valued fields. +MULTIVALUE_UNSAFE_NEGATIVE_OPS: frozenset[str] = frozenset({"≠", "not in"}) + + +class UnsupportedMetaFilter(Exception): + """Raised when a metadata filter cannot be expressed as ES DSL. + + Carries the filter that failed so callers can log a precise reason and the + in-memory fallback can pick up unchanged. + """ + + def __init__(self, reason: str, filter_clause: Optional[Dict[str, Any]] = None) -> None: + super().__init__(reason) + self.reason = reason + self.filter_clause = filter_clause + + +@dataclass +class TranslatedFilter: + """A single user filter rendered as one or more ES bool clauses. + + A clause that wants the field to be present (``≠``, ``not in``, range, + ``not contains``) goes into ``must`` so the negation does not accidentally + match documents missing the key. ``must_not`` carries the actual rejection. + Pure positive filters (``=``, ``contains``, ``in``, ``exists``) fill + ``must`` only. + """ + + must: List[Dict[str, Any]] = field(default_factory=list) + must_not: List[Dict[str, Any]] = field(default_factory=list) + + def to_clauses(self) -> List[Dict[str, Any]]: + """Collapse to the ES clauses this filter contributes to a parent bool. + + Always emits a single atomic clause when there is anything to emit: + a multi-clause ``must`` (e.g. range = ``exists`` + ``range``) gets + wrapped in its own ``bool`` so an OR-logic parent ``should`` can't + match on just one half of the filter. A pure single positive clause + is returned unwrapped because there is nothing to break apart. + """ + if not self.must and not self.must_not: + return [] + if not self.must_not: + if len(self.must) == 1: + return list(self.must) + # Multi-clause positive filter — keep it atomic for OR parents. + return [{"bool": {"must": list(self.must)}}] + # Negative semantics always need wrapping so they survive being OR'd + # with siblings. + return [{"bool": {"must": list(self.must), "must_not": list(self.must_not)}}] + + +@dataclass +class MetaFilterPushdownPlan: + """Composed ES bool query body for an entire RAGflow filter request.""" + + logic: str + translated: List[TranslatedFilter] = field(default_factory=list) + + def is_empty(self) -> bool: + return not self.translated + + def to_query(self, kb_ids: Sequence[str]) -> Dict[str, Any]: + """Render the full ES query body, scoped to the given KB ids. + + The KB filter is always a ``terms`` clause so the query can serve any + number of knowledge bases without rewriting the caller. + """ + kb_clause = {"terms": {"kb_id": list(kb_ids)}} + + if self.is_empty(): + return {"query": {"bool": {"filter": [kb_clause]}}} + + sub_clauses = [t.to_clauses() for t in self.translated] + flat_clauses: List[Dict[str, Any]] = [c for group in sub_clauses for c in group] + + if self.logic == "or": + inner = { + "bool": { + "should": flat_clauses, + "minimum_should_match": 1, + } + } + else: + inner = {"bool": {"must": flat_clauses}} + + return { + "query": { + "bool": { + "filter": [kb_clause, inner], + } + } + } + + +class MetaFilterTranslator: + """Translate one user filter clause at a time into ES DSL fragments. + + Stateless aside from configuration; safe to instantiate once per request + or share at module scope. + """ + + def __init__(self, prefix: str = META_FIELDS_PREFIX) -> None: + self.prefix = prefix + + def field_name(self, key: str) -> str: + """Compose the dotted ES field path for a user metadata key.""" + return f"{self.prefix}.{key}" + + def translate(self, flt: Dict[str, Any]) -> TranslatedFilter: + """Translate a single filter dict into ES bool clauses. + + Raises ``UnsupportedMetaFilter`` for malformed input or operator/value + combinations the legacy in-memory path treats as a special case (e.g. + list-of-strings membership in ``in``/``not in``). + """ + op = flt.get("op") + key = flt.get("key") + value = flt.get("value") + + if not key or not isinstance(key, str): + raise UnsupportedMetaFilter("filter is missing a string key", flt) + if op not in SUPPORTED_OPERATORS: + raise UnsupportedMetaFilter(f"unknown operator {op!r}", flt) + + field_path = self.field_name(key) + + if op == "empty": + return self._translate_empty(field_path) + if op == "not empty": + return self._translate_not_empty(field_path) + if op == "=": + return self._translate_equal(field_path, value, flt) + if op == "≠": + return self._translate_not_equal(field_path, value, flt) + if op in _RANGE_OPS: + return self._translate_range(field_path, op, value, flt) + if op == "in": + return self._translate_in(field_path, value, flt) + if op == "not in": + return self._translate_not_in(field_path, value, flt) + if op == "contains": + return self._translate_contains(field_path, value, flt) + if op == "not contains": + return self._translate_not_contains(field_path, value, flt) + if op == "start with": + return self._translate_start_with(field_path, value, flt) + if op == "end with": + return self._translate_end_with(field_path, value, flt) + + # Unreachable: SUPPORTED_OPERATORS gate above covers every branch. + raise UnsupportedMetaFilter(f"no handler for operator {op!r}", flt) + + def _translate_empty(self, field_path: str) -> TranslatedFilter: + # "empty" matches documents whose value is missing OR equals "" — same + # falsy semantics the in-memory ``not input`` check enforces. The + # blank-string check has to target ``.keyword`` because the analyzed + # text field drops empty values during tokenisation, leaving no token + # for ``term: ""`` to match. + return TranslatedFilter( + must=[ + { + "bool": { + "should": [ + {"bool": {"must_not": [{"exists": {"field": field_path}}]}}, + {"term": {_keyword_path(field_path): ""}}, + ], + "minimum_should_match": 1, + } + } + ] + ) + + def _translate_not_empty(self, field_path: str) -> TranslatedFilter: + return TranslatedFilter( + must=[{"exists": {"field": field_path}}], + must_not=[{"term": {_keyword_path(field_path): ""}}], + ) + + def _translate_equal(self, field_path: str, value: Any, flt: Dict[str, Any]) -> TranslatedFilter: + coerced = _coerce_scalar(value, flt) + return TranslatedFilter(must=[_term_or_match(field_path, coerced)]) + + def _translate_not_equal(self, field_path: str, value: Any, flt: Dict[str, Any]) -> TranslatedFilter: + coerced = _coerce_scalar(value, flt) + return TranslatedFilter( + must=[{"exists": {"field": field_path}}], + must_not=[_term_or_match(field_path, coerced)], + ) + + def _translate_range(self, field_path: str, op: str, value: Any, flt: Dict[str, Any]) -> TranslatedFilter: + coerced = _coerce_range_value(value, flt) + return TranslatedFilter( + must=[ + {"exists": {"field": field_path}}, + {"range": {field_path: {_RANGE_OPS[op]: coerced}}}, + ] + ) + + def _translate_in(self, field_path: str, value: Any, flt: Dict[str, Any]) -> TranslatedFilter: + members = _csv_or_list(value, flt) + return TranslatedFilter(must=[_terms_string_or_numeric(field_path, members)]) + + def _translate_not_in(self, field_path: str, value: Any, flt: Dict[str, Any]) -> TranslatedFilter: + members = _csv_or_list(value, flt) + return TranslatedFilter( + must=[{"exists": {"field": field_path}}], + must_not=[_terms_string_or_numeric(field_path, members)], + ) + + def _translate_contains(self, field_path: str, value: Any, flt: Dict[str, Any]) -> TranslatedFilter: + text = _coerce_string(value, flt) + return TranslatedFilter(must=[_wildcard(field_path, f"*{_escape_wildcard(text)}*")]) + + def _translate_not_contains(self, field_path: str, value: Any, flt: Dict[str, Any]) -> TranslatedFilter: + text = _coerce_string(value, flt) + return TranslatedFilter( + must=[{"exists": {"field": field_path}}], + must_not=[_wildcard(field_path, f"*{_escape_wildcard(text)}*")], + ) + + def _translate_start_with(self, field_path: str, value: Any, flt: Dict[str, Any]) -> TranslatedFilter: + text = _coerce_string(value, flt) + return TranslatedFilter( + must=[{"prefix": {_keyword_path(field_path): {"value": text, "case_insensitive": True}}}] + ) + + def _translate_end_with(self, field_path: str, value: Any, flt: Dict[str, Any]) -> TranslatedFilter: + text = _coerce_string(value, flt) + return TranslatedFilter(must=[_wildcard(field_path, f"*{_escape_wildcard(text)}")]) + + +def build_meta_filter_query( + filters: Sequence[Dict[str, Any]], + logic: str, + kb_ids: Sequence[str], + translator: Optional[MetaFilterTranslator] = None, +) -> Dict[str, Any]: + """Top-level helper: translate every filter and render the ES query body. + + Raises ``UnsupportedMetaFilter`` if any filter cannot be expressed. + """ + plan = plan_pushdown(filters, logic, translator=translator) + return plan.to_query(kb_ids) + + +def plan_pushdown( + filters: Sequence[Dict[str, Any]], + logic: str, + translator: Optional[MetaFilterTranslator] = None, +) -> MetaFilterPushdownPlan: + """Translate every filter in turn, building a single composed plan. + + Separated from ``build_meta_filter_query`` so callers can inspect or + augment the plan before binding it to a KB scope. + """ + if logic not in {"and", "or"}: + raise UnsupportedMetaFilter(f"unknown logic {logic!r}") + + t = translator or MetaFilterTranslator() + plan = MetaFilterPushdownPlan(logic=logic) + for flt in filters: + plan.translated.append(t.translate(flt)) + return plan + + +def is_pushdown_supported(filters: Sequence[Dict[str, Any]]) -> bool: + """Cheap pre-check: do all filters look translatable without coercion? + + Used by the routing layer to skip the heavier ``plan_pushdown`` call when + the request obviously needs the in-memory fallback. + + Operators in :data:`MULTIVALUE_UNSAFE_NEGATIVE_OPS` are rejected here so a + single such filter forces the whole request to in-memory evaluation, which + is the only place we can replicate the per-bucket semantics over + multi-valued metadata fields. + """ + for flt in filters: + op = flt.get("op") + if op not in SUPPORTED_OPERATORS: + return False + if op in MULTIVALUE_UNSAFE_NEGATIVE_OPS: + return False + if not isinstance(flt.get("key"), str) or not flt.get("key"): + return False + return True + + +def extract_doc_ids(es_response: Dict[str, Any]) -> List[str]: + """Pull doc IDs out of an ES search response shaped like ``{hits:{hits:[...]}}``. + + Tolerates both the dict-typed ES 7+ response and the dict-coerced + ``ObjectApiResponse`` returned by the elasticsearch python client. + """ + hits_root = es_response.get("hits") if isinstance(es_response, dict) else None + if not hits_root: + # ``ObjectApiResponse`` is dict-like; ``.get`` works at both levels. + try: + hits_root = es_response["hits"] + except Exception: + return [] + + raw_hits: Iterable[Dict[str, Any]] + if isinstance(hits_root, dict): + raw_hits = hits_root.get("hits", []) or [] + else: + raw_hits = [] + + out: List[str] = [] + for hit in raw_hits: + if not isinstance(hit, dict): + continue + # ``id`` is mirrored into ``_source`` by the metadata writer; ``_id`` + # is the canonical identifier. Prefer ``_id`` so renames in the source + # field name don't break us. + doc_id = hit.get("_id") + if not doc_id: + source = hit.get("_source") or {} + doc_id = source.get("id") or source.get("doc_id") + if doc_id: + out.append(str(doc_id)) + return out + + +# --------------------------------------------------------------------------- +# Value coercion helpers +# --------------------------------------------------------------------------- + + +def _coerce_scalar(value: Any, flt: Dict[str, Any]) -> Any: + """Mirror the legacy ``ast.literal_eval`` then ``str.lower()`` flow. + + The in-memory filter parses values as Python literals when possible (so + ``"5"`` becomes ``5``) and lower-cases strings. For ES ``term`` queries we + need the same coercion or numeric data won't match. + """ + if value is None: + raise UnsupportedMetaFilter("scalar comparison value is None", flt) + if isinstance(value, (list, dict)): + raise UnsupportedMetaFilter("scalar comparison value is non-scalar", flt) + + s = str(value).strip() + if _DATE_RE.match(s): + return s + try: + parsed = ast.literal_eval(s) + except Exception: + parsed = s + if isinstance(parsed, str): + return parsed.lower() + if isinstance(parsed, (int, float, bool)): + return parsed + return s.lower() + + +def _coerce_range_value(value: Any, flt: Dict[str, Any]) -> Any: + """Range comparisons accept dates verbatim and numbers parsed via literal_eval. + + Strings that aren't numeric or ISO dates are pushed through as-is — ES + will compare them lexically against keyword fields, which is the same + behaviour as the in-memory ``input >= value`` Python comparison after the + original ``ast.literal_eval`` failure path. + """ + if value is None: + raise UnsupportedMetaFilter("range comparison value is None", flt) + s = str(value).strip() + if _DATE_RE.match(s): + return s + try: + parsed = ast.literal_eval(s) + except Exception: + return s + if isinstance(parsed, (int, float)): + return parsed + return s + + +def _coerce_string(value: Any, flt: Dict[str, Any]) -> str: + """String operators (contains/start with/end with) need a non-empty string.""" + if value is None: + raise UnsupportedMetaFilter("string-operator value is None", flt) + if isinstance(value, (list, dict)): + raise UnsupportedMetaFilter("string-operator value must be a scalar", flt) + s = str(value) + if not s: + raise UnsupportedMetaFilter("string-operator value is empty", flt) + return s + + +def _csv_or_list(value: Any, flt: Dict[str, Any]) -> List[Any]: + """``in`` / ``not in`` accept either a real list or a comma-separated string. + + The legacy in-memory path applies ``ast.literal_eval`` to the value too. + Mirror that for parity, then trim whitespace and lower-case any strings. + """ + if value is None: + raise UnsupportedMetaFilter("membership value is None", flt) + + if isinstance(value, (list, tuple)): + members = list(value) + elif isinstance(value, str): + try: + parsed = ast.literal_eval(value) + except Exception: + parsed = value + if isinstance(parsed, (list, tuple)): + members = list(parsed) + else: + members = [m.strip() for m in value.split(",") if m.strip()] + else: + members = [value] + + if not members: + raise UnsupportedMetaFilter("membership value resolved to empty list", flt) + + normalised: List[Any] = [] + for m in members: + if isinstance(m, str): + normalised.append(m.lower().strip()) + else: + normalised.append(m) + return normalised + + +def _keyword_path(field_path: str) -> str: + """Sub-field used for exact-match string queries. + + Dynamic mapping under ``meta_fields`` indexes string values as ``text`` + with a ``.keyword`` multi-field. ``term``/``terms``/``prefix``/``wildcard`` + against the analyzed parent breaks for any multi-word value because the + inverted index stores per-token entries, not the original phrase. Routing + string queries through ``.keyword`` keeps semantics aligned with the + in-memory ``meta_filter`` (full-string compare after lower-casing). + """ + return f"{field_path}.keyword" + + +def _term_or_match(field_path: str, value: Any) -> Dict[str, Any]: + """Exact-match clause that respects how dynamic mapping indexes the value. + + String values target the ``.keyword`` sub-field with ``case_insensitive`` + so phrase values still match (the in-memory path lower-cases before + comparing). Numeric / bool values target the parent path because numeric + fields have no ``.keyword`` sub-field under default dynamic mapping. + """ + if isinstance(value, str): + return { + "term": { + _keyword_path(field_path): { + "value": value, + "case_insensitive": True, + } + } + } + return {"term": {field_path: value}} + + +def _terms_string_or_numeric(field_path: str, members: List[Any]) -> Dict[str, Any]: + """``in``/``not in`` payload that mirrors ``_term_or_match`` per element. + + ES ``terms`` does not accept ``case_insensitive``, so for string members we + expand into a ``bool: should`` of case-insensitive ``term`` queries on the + keyword sub-field. Pure-numeric / bool member lists keep the cheaper + ``terms`` form on the parent path. + """ + if all(not isinstance(m, str) for m in members): + return {"terms": {field_path: members}} + return { + "bool": { + "should": [_term_or_match(field_path, m) for m in members], + "minimum_should_match": 1, + } + } + + +def _wildcard(field_path: str, pattern: str) -> Dict[str, Any]: + """Wildcard runs against ``.keyword`` so the original phrase is searched. + + ``wildcard`` against an analyzed text field walks per-token entries, which + drops phrase context (``Alice Wonderland`` becomes tokens ``alice``, + ``wonderland``). The ``.keyword`` sub-field preserves the full original + string, matching the in-memory ``str.find`` semantics. + """ + return { + "wildcard": { + _keyword_path(field_path): { + "value": pattern, + "case_insensitive": True, + } + } + } + + +def _escape_wildcard(text: str) -> str: + """Escape the two ES wildcard metacharacters so user input stays literal.""" + return text.replace("\\", "\\\\").replace("*", "\\*").replace("?", "\\?") diff --git a/common/metadata_utils.py b/common/metadata_utils.py index 79db193ebe..c2fc90b541 100644 --- a/common/metadata_utils.py +++ b/common/metadata_utils.py @@ -166,11 +166,13 @@ def meta_filter(metas: dict, filters: list[dict], logic: str = "and"): async def apply_meta_data_filter( meta_data_filter: dict | None, - metas: dict, - question: str, + metas: dict | None = None, + question: str = "", chat_mdl: Any = None, base_doc_ids: list[str] | None = None, manual_value_resolver: Callable[[dict], dict] | None = None, + kb_ids: list[str] | None = None, + metas_loader: Callable[[], dict] | None = None, ) -> list[str] | None: """ Apply metadata filtering rules and return the filtered doc_ids. @@ -180,6 +182,20 @@ async def apply_meta_data_filter( - semi_auto: generate conditions using selected metadata keys only - manual: directly filter based on provided conditions + When ``kb_ids`` is supplied and the active doc store is Elasticsearch the + generated filter conditions are pushed down to ES via + ``DocMetadataService.filter_doc_ids_by_meta_pushdown`` instead of being + evaluated in Python over ``metas``. The in-memory ``meta_filter`` path + remains the fallback so callers without a KB scope, or backends without + push-down support, behave exactly as before. + + ``metas`` may be supplied eagerly or via ``metas_loader``. The loader is + only invoked when the metadata dict is actually needed — i.e. for the LLM + context in ``auto`` / ``semi_auto`` modes, or as the in-memory fallback + when push-down can't service a request. ``manual`` mode that lands on the + push-down path therefore skips the expensive + ``get_flatted_meta_by_kbs`` round-trip entirely. + Returns: list of doc_ids, ["-999"] when manual filters yield no result, or None when auto/semi_auto filters return empty. @@ -193,9 +209,28 @@ async def apply_meta_data_filter( method = meta_data_filter.get("method") + # Memoised metadata loader. ``_get_metas`` materialises the dict at most + # once per call; downstream branches that never reach an in-memory eval + # leave the loader untouched. + cached_metas: dict | None = metas + + def _get_metas() -> dict: + nonlocal cached_metas + if cached_metas is None: + cached_metas = metas_loader() if metas_loader else {} + return cached_metas + + def _evaluate(conditions: list[dict], logic: str) -> list[str]: + """Run conditions through ES push-down when possible, in-memory otherwise.""" + if conditions and kb_ids: + pushed = _try_meta_pushdown(kb_ids, conditions, logic) + if pushed is not None: + return pushed + return meta_filter(_get_metas(), conditions, logic) + if method == "auto": - filters: dict = await gen_meta_filter(chat_mdl, metas, question) - doc_ids.extend(meta_filter(metas, filters["conditions"], filters.get("logic", "and"))) + filters: dict = await gen_meta_filter(chat_mdl, _get_metas(), question) + doc_ids.extend(_evaluate(filters["conditions"], filters.get("logic", "and"))) if not doc_ids: return None elif method == "semi_auto": @@ -212,23 +247,47 @@ async def apply_meta_data_filter( constraints[key] = op if selected_keys: - filtered_metas = {key: metas[key] for key in selected_keys if key in metas} + current_metas = _get_metas() + filtered_metas = {key: current_metas[key] for key in selected_keys if key in current_metas} if filtered_metas: filters: dict = await gen_meta_filter(chat_mdl, filtered_metas, question, constraints=constraints) - doc_ids.extend(meta_filter(metas, filters["conditions"], filters.get("logic", "and"))) + doc_ids.extend(_evaluate(filters["conditions"], filters.get("logic", "and"))) if not doc_ids: return None elif method == "manual": filters = meta_data_filter.get("manual", []) if manual_value_resolver: filters = [manual_value_resolver(flt) for flt in filters] - doc_ids.extend(meta_filter(metas, filters, meta_data_filter.get("logic", "and"))) + doc_ids.extend(_evaluate(filters, meta_data_filter.get("logic", "and"))) if filters and not doc_ids: doc_ids = ["-999"] return doc_ids +def _try_meta_pushdown( + kb_ids: list[str], + conditions: list[dict], + logic: str, +) -> list[str] | None: + """Attempt the ES push-down path; return ``None`` to fall back in-memory. + + Lazy-imports ``DocMetadataService`` so this module stays usable in + environments where the API/db layer hasn't been wired up (e.g. unit tests + that exercise ``meta_filter`` directly). + """ + try: + from api.db.services.doc_metadata_service import DocMetadataService + except Exception as e: + logging.debug(f"[apply_meta_data_filter] push-down disabled, import failed: {e}") + return None + try: + return DocMetadataService.filter_doc_ids_by_meta_pushdown(kb_ids, conditions, logic) + except Exception as e: + logging.warning(f"[apply_meta_data_filter] push-down errored, falling back: {e}") + return None + + def dedupe_list(values: list) -> list: seen = set() deduped = [] diff --git a/test/testcases/test_http_api/test_session_management/test_session_sdk_routes_unit.py b/test/testcases/test_http_api/test_session_management/test_session_sdk_routes_unit.py index 6d2dcbf3a7..de548361c4 100644 --- a/test/testcases/test_http_api/test_session_management/test_session_sdk_routes_unit.py +++ b/test/testcases/test_http_api/test_session_management/test_session_sdk_routes_unit.py @@ -1476,7 +1476,9 @@ def test_searchbots_retrieval_test_embedded_matrix_unit(monkeypatch): ) monkeypatch.setattr(module.DocMetadataService, "get_flatted_meta_by_kbs", lambda _kb_ids: [{"id": "doc-1"}]) - async def _apply_filter(_meta_filter, _metas, _question, _chat_mdl, _local_doc_ids): + async def _apply_filter(_meta_filter, _metas, _question, _chat_mdl, _local_doc_ids, **kwargs): + assert kwargs["kb_ids"] == ["kb-1"] + assert callable(kwargs["metas_loader"]) return ["doc-filtered"] monkeypatch.setattr(module, "apply_meta_data_filter", _apply_filter) diff --git a/test/unit_test/common/test_metadata_es_filter.py b/test/unit_test/common/test_metadata_es_filter.py new file mode 100644 index 0000000000..eb8217909e --- /dev/null +++ b/test/unit_test/common/test_metadata_es_filter.py @@ -0,0 +1,473 @@ +"""Unit tests for the Elasticsearch push-down translator. + +These tests cover the public surface of ``common.metadata_es_filter`` without +touching the live ES cluster. They verify the shape of the produced query DSL +operator-by-operator and confirm that the parity rules with the in-memory +``meta_filter`` (lower-casing, list-membership coercion, date detection) hold. +""" + +import pytest + +from common.metadata_es_filter import ( + META_FIELDS_PREFIX, + MetaFilterPushdownPlan, + MetaFilterTranslator, + SUPPORTED_OPERATORS, + UnsupportedMetaFilter, + build_meta_filter_query, + extract_doc_ids, + is_pushdown_supported, + plan_pushdown, +) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def translator() -> MetaFilterTranslator: + return MetaFilterTranslator() + + +def _field(key: str) -> str: + return f"{META_FIELDS_PREFIX}.{key}" + + +# --------------------------------------------------------------------------- +# Translator: per-operator shape +# --------------------------------------------------------------------------- + + +def test_equal_translates_to_term_with_lowercased_value(translator): + """String equality runs against ``.keyword`` so multi-word phrases match. + + Querying the analyzed parent field with ``term`` only matches docs whose + inverted index contains the literal phrase token, which never happens for + multi-word values. The ``.keyword`` sub-field stores the unmodified string, + and ``case_insensitive: true`` keeps the lower-cased compare semantics from + the in-memory ``meta_filter``. + """ + clauses = translator.translate({"key": "tag", "op": "=", "value": "Alpha"}).to_clauses() + assert clauses == [ + {"term": {_field("tag") + ".keyword": {"value": "alpha", "case_insensitive": True}}} + ] + + +def test_equal_parses_numeric_literal(translator): + """Numeric values stay on the parent path — no ``.keyword`` sub-field exists for ``long``.""" + clauses = translator.translate({"key": "score", "op": "=", "value": "5"}).to_clauses() + assert clauses == [{"term": {_field("score"): 5}}] + + +def test_equal_multiword_uses_keyword_subfield(translator): + """Regression for qinling0210's report: multi-word string values must match. + + Before the keyword-routing fix this emitted + ``term: meta_fields.author = "alice wonderland"`` against an analyzed text + field, which never matched (inverted index only contained per-token + entries). Routing through ``.keyword`` preserves the full phrase. + """ + clauses = translator.translate( + {"key": "author", "op": "=", "value": "Alice Wonderland"} + ).to_clauses() + assert clauses == [ + { + "term": { + _field("author") + ".keyword": { + "value": "alice wonderland", + "case_insensitive": True, + } + } + } + ] + + +def test_not_equal_requires_field_to_exist(translator): + clauses = translator.translate({"key": "tag", "op": "≠", "value": "alpha"}).to_clauses() + assert clauses == [ + { + "bool": { + "must": [{"exists": {"field": _field("tag")}}], + "must_not": [ + {"term": {_field("tag") + ".keyword": {"value": "alpha", "case_insensitive": True}}} + ], + } + } + ] + + +@pytest.mark.parametrize( + "op,es_key", + [(">", "gt"), ("<", "lt"), ("≥", "gte"), ("≤", "lte")], +) +def test_range_operator_translation(translator, op, es_key): + # Multi-clause positive filters wrap into a single bool so OR-logic + # parents can't match on just the ``exists`` half of the range. + clauses = translator.translate({"key": "score", "op": op, "value": "10"}).to_clauses() + assert clauses == [ + { + "bool": { + "must": [ + {"exists": {"field": _field("score")}}, + {"range": {_field("score"): {es_key: 10}}}, + ] + } + } + ] + + +def test_range_passes_iso_date_through_unparsed(translator): + clauses = translator.translate({"key": "published", "op": "≥", "value": "2025-01-15"}).to_clauses() + range_clause = clauses[0]["bool"]["must"][1] + assert range_clause == {"range": {_field("published"): {"gte": "2025-01-15"}}} + + +def _string_terms_should(field_path: str, members): + """``in``/``not in`` over string members expands per-element so each ``term`` + can carry ``case_insensitive`` (``terms`` does not accept that flag).""" + return { + "bool": { + "should": [ + {"term": {field_path + ".keyword": {"value": m, "case_insensitive": True}}} + for m in members + ], + "minimum_should_match": 1, + } + } + + +def test_in_operator_csv_value_lowercased(translator): + clauses = translator.translate({"key": "status", "op": "in", "value": "Active,Pending"}).to_clauses() + assert clauses == [_string_terms_should(_field("status"), ["active", "pending"])] + + +def test_in_operator_python_list_literal(translator): + clauses = translator.translate({"key": "status", "op": "in", "value": "['Open', 'Closed']"}).to_clauses() + assert clauses == [_string_terms_should(_field("status"), ["open", "closed"])] + + +def test_in_operator_numeric_members_keep_terms(translator): + """All-numeric member lists keep the cheaper ``terms`` form on the parent path.""" + clauses = translator.translate({"key": "year", "op": "in", "value": "[2024, 2025]"}).to_clauses() + assert clauses == [{"terms": {_field("year"): [2024, 2025]}}] + + +def test_not_in_negates_with_existence_guard(translator): + clauses = translator.translate({"key": "status", "op": "not in", "value": "active,pending"}).to_clauses() + assert clauses == [ + { + "bool": { + "must": [{"exists": {"field": _field("status")}}], + "must_not": [_string_terms_should(_field("status"), ["active", "pending"])], + } + } + ] + + +def test_contains_uses_case_insensitive_wildcard(translator): + clauses = translator.translate({"key": "version", "op": "contains", "value": "earth"}).to_clauses() + assert clauses == [ + { + "wildcard": { + _field("version") + ".keyword": { + "value": "*earth*", + "case_insensitive": True, + } + } + } + ] + + +def test_contains_escapes_user_wildcards(translator): + clauses = translator.translate({"key": "title", "op": "contains", "value": "a*b?c"}).to_clauses() + pattern = clauses[0]["wildcard"][_field("title") + ".keyword"]["value"] + assert pattern == "*a\\*b\\?c*" + + +def test_not_contains_negates_with_exists(translator): + clauses = translator.translate({"key": "version", "op": "not contains", "value": "earth"}).to_clauses() + assert clauses == [ + { + "bool": { + "must": [{"exists": {"field": _field("version")}}], + "must_not": [ + { + "wildcard": { + _field("version") + ".keyword": { + "value": "*earth*", + "case_insensitive": True, + } + } + } + ], + } + } + ] + + +def test_start_with_uses_prefix(translator): + clauses = translator.translate({"key": "name", "op": "start with", "value": "pre"}).to_clauses() + assert clauses == [ + {"prefix": {_field("name") + ".keyword": {"value": "pre", "case_insensitive": True}}} + ] + + +def test_end_with_uses_trailing_wildcard(translator): + clauses = translator.translate({"key": "file", "op": "end with", "value": ".pdf"}).to_clauses() + pattern = clauses[0]["wildcard"][_field("file") + ".keyword"]["value"] + assert pattern == "*.pdf" + + +def test_empty_matches_missing_or_blank(translator): + clauses = translator.translate({"key": "notes", "op": "empty", "value": ""}).to_clauses() + assert clauses == [ + { + "bool": { + "should": [ + {"bool": {"must_not": [{"exists": {"field": _field("notes")}}]}}, + {"term": {_field("notes") + ".keyword": ""}}, + ], + "minimum_should_match": 1, + } + } + ] + + +def test_not_empty_requires_exists_and_excludes_blank(translator): + clauses = translator.translate({"key": "notes", "op": "not empty", "value": ""}).to_clauses() + assert clauses == [ + { + "bool": { + "must": [{"exists": {"field": _field("notes")}}], + "must_not": [{"term": {_field("notes") + ".keyword": ""}}], + } + } + ] + + +# --------------------------------------------------------------------------- +# Translator: validation paths +# --------------------------------------------------------------------------- + + +def test_unknown_operator_raises(translator): + with pytest.raises(UnsupportedMetaFilter) as exc: + translator.translate({"key": "tag", "op": "regex", "value": "^foo"}) + assert "regex" in exc.value.reason + + +def test_missing_key_raises(translator): + with pytest.raises(UnsupportedMetaFilter): + translator.translate({"op": "=", "value": "x"}) + + +def test_scalar_op_with_list_value_raises(translator): + with pytest.raises(UnsupportedMetaFilter): + translator.translate({"key": "tag", "op": "=", "value": ["a", "b"]}) + + +def test_string_op_with_empty_value_raises(translator): + with pytest.raises(UnsupportedMetaFilter): + translator.translate({"key": "tag", "op": "contains", "value": ""}) + + +def test_membership_with_empty_csv_raises(translator): + with pytest.raises(UnsupportedMetaFilter): + translator.translate({"key": "tag", "op": "in", "value": ""}) + + +def test_supported_operator_set_matches_documentation(): + expected = { + "=", + "≠", + ">", + "<", + "≥", + "≤", + "in", + "not in", + "contains", + "not contains", + "start with", + "end with", + "empty", + "not empty", + } + assert SUPPORTED_OPERATORS == expected + + +# --------------------------------------------------------------------------- +# Plan composition +# --------------------------------------------------------------------------- + + +def test_plan_emits_must_clauses_for_and_logic(): + plan = plan_pushdown( + [ + {"key": "tag", "op": "=", "value": "alpha"}, + {"key": "score", "op": ">", "value": "5"}, + ], + logic="and", + ) + assert isinstance(plan, MetaFilterPushdownPlan) + body = plan.to_query(["kb1"]) + bool_root = body["query"]["bool"] + assert bool_root["filter"][0] == {"terms": {"kb_id": ["kb1"]}} + inner = bool_root["filter"][1]["bool"] + assert "must" in inner + # Each translated filter contributes exactly one clause to the parent bool: + # ``=`` is a single ``term``; ``>`` is wrapped into one atomic ``bool``. + assert len(inner["must"]) == 2 + expected_tag_term = { + "term": {_field("tag") + ".keyword": {"value": "alpha", "case_insensitive": True}} + } + assert expected_tag_term in inner["must"] + range_wrap = { + "bool": { + "must": [ + {"exists": {"field": _field("score")}}, + {"range": {_field("score"): {"gt": 5}}}, + ] + } + } + assert range_wrap in inner["must"] + + +def test_range_filter_under_or_stays_atomic(): + """An OR'd range must not split into independent ``exists`` + ``range`` should branches.""" + body = build_meta_filter_query( + [ + {"key": "tag", "op": "=", "value": "alpha"}, + {"key": "score", "op": ">", "value": "5"}, + ], + logic="or", + kb_ids=["kb1"], + ) + should = body["query"]["bool"]["filter"][1]["bool"]["should"] + # Two filters → two should branches, not three or four. + assert len(should) == 2 + assert { + "term": {_field("tag") + ".keyword": {"value": "alpha", "case_insensitive": True}} + } in should + + +def test_plan_emits_should_clauses_for_or_logic(): + plan = plan_pushdown( + [ + {"key": "tag", "op": "=", "value": "alpha"}, + {"key": "tag", "op": "=", "value": "beta"}, + ], + logic="or", + ) + inner = plan.to_query(["kb1"])["query"]["bool"]["filter"][1]["bool"] + assert inner["minimum_should_match"] == 1 + assert len(inner["should"]) == 2 + + +def test_unknown_logic_rejected(): + with pytest.raises(UnsupportedMetaFilter): + plan_pushdown([{"key": "k", "op": "=", "value": "v"}], logic="xor") + + +def test_empty_filter_list_returns_kb_only_query(): + body = build_meta_filter_query([], "and", ["kb1", "kb2"]) + assert body == {"query": {"bool": {"filter": [{"terms": {"kb_id": ["kb1", "kb2"]}}]}}} + + +def test_negative_filter_in_or_logic_keeps_negation_scope(): + """Wrapping ``≠`` in an OR should not let the ``must_not`` swallow other branches. + + ``≠`` is rejected by :func:`is_pushdown_supported` for multi-value safety, so + this test exercises the translator directly to confirm the per-filter + wrapping invariant. The same shape protects ``not contains`` (which IS + pushed down) from leaking its ``must_not`` into a parent should. + """ + body = build_meta_filter_query( + [ + {"key": "tag", "op": "=", "value": "alpha"}, + {"key": "tag", "op": "≠", "value": "beta"}, + ], + logic="or", + kb_ids=["kb1"], + ) + inner = body["query"]["bool"]["filter"][1]["bool"] + should = inner["should"] + assert should[0] == { + "term": {_field("tag") + ".keyword": {"value": "alpha", "case_insensitive": True}} + } + # The ≠ branch is wrapped so its must_not does not bleed into the OR set. + assert "bool" in should[1] + assert "must_not" in should[1]["bool"] + + +# --------------------------------------------------------------------------- +# is_pushdown_supported pre-check +# --------------------------------------------------------------------------- + + +def test_pushdown_check_accepts_known_ops(): + assert is_pushdown_supported( + [ + {"key": "tag", "op": "=", "value": "v"}, + {"key": "tag", "op": "contains", "value": "x"}, + ] + ) + + +def test_pushdown_check_rejects_unknown_op(): + assert not is_pushdown_supported([{"key": "tag", "op": "regex", "value": "^v"}]) + + +def test_pushdown_check_rejects_missing_key(): + assert not is_pushdown_supported([{"op": "=", "value": "v"}]) + + +@pytest.mark.parametrize("op", ["≠", "not in"]) +def test_pushdown_check_rejects_multivalue_unsafe_negatives(op): + """Negatives that diverge on multi-valued fields force the in-memory fallback.""" + assert not is_pushdown_supported([{"key": "tag", "op": op, "value": "x"}]) + + +def test_pushdown_check_one_unsafe_op_rejects_whole_request(): + """Mixing one unsafe op with safe ones still falls back, preserving correctness.""" + assert not is_pushdown_supported( + [ + {"key": "tag", "op": "=", "value": "v"}, + {"key": "tag", "op": "≠", "value": "w"}, + ] + ) + + +def test_pushdown_check_accepts_not_contains(): + """``not contains`` stays in push-down; ``all(not contains)`` ≡ ``not any(contains)``.""" + assert is_pushdown_supported([{"key": "tag", "op": "not contains", "value": "x"}]) + + +# --------------------------------------------------------------------------- +# extract_doc_ids +# --------------------------------------------------------------------------- + + +def test_extract_doc_ids_from_dict_response(): + response = { + "hits": { + "hits": [ + {"_id": "doc1", "_source": {"id": "doc1"}}, + {"_id": "doc2", "_source": {"id": "doc2"}}, + ] + } + } + assert extract_doc_ids(response) == ["doc1", "doc2"] + + +def test_extract_doc_ids_falls_back_to_source_id(): + response = {"hits": {"hits": [{"_source": {"id": "src-id"}}]}} + assert extract_doc_ids(response) == ["src-id"] + + +def test_extract_doc_ids_empty_response(): + assert extract_doc_ids({}) == [] + assert extract_doc_ids({"hits": {}}) == [] + assert extract_doc_ids({"hits": {"hits": []}}) == []