diff --git a/api/db/services/doc_metadata_service.py b/api/db/services/doc_metadata_service.py index 1cf887c2d3..34258c69f5 100644 --- a/api/db/services/doc_metadata_service.py +++ b/api/db/services/doc_metadata_service.py @@ -385,13 +385,25 @@ class DocMetadataService: if result: logging.error(f"Failed to insert metadata for document {doc_id}: {result}") return False - # Force ES refresh to make metadata immediately available for search + # Force refresh so metadata is immediately searchable. + # Both Elasticsearch and OpenSearch backends expose refresh_idx; + # Infinity does not need a manual refresh. if not settings.DOC_ENGINE_INFINITY: - try: - settings.docStoreConn.es.indices.refresh(index=index_name) - logging.debug(f"Refreshed metadata index: {index_name}") - except Exception as e: - logging.warning(f"Failed to refresh metadata index {index_name}: {e}") + refresh_idx = getattr(settings.docStoreConn, "refresh_idx", None) + if callable(refresh_idx): + if refresh_idx(index_name): + logging.debug(f"Refreshed metadata index: {index_name}") + else: + # A failed refresh can leave just-inserted metadata + # invisible to subsequent reads; surface it so operators + # can correlate stale-read complaints with the cause. + logging.warning( + f"Failed to refresh metadata index {index_name} on backend " + f"{type(settings.docStoreConn).__name__}; " + f"metadata may not be immediately searchable" + ) + else: + logging.debug(f"Backend {type(settings.docStoreConn).__name__} has no refresh_idx; skipping") logging.debug(f"Successfully inserted metadata for document {doc_id}") return True @@ -459,23 +471,23 @@ class DocMetadataService: [kb_id] ) if doc_exists: - # Document exists - replace meta_fields entirely - # Use upsert to fully replace the meta_fields field - # (ES update with doc parameter does deep merge on object fields, - # which would retain old keys that should be removed) - settings.docStoreConn.es.update( - index=index_name, - id=doc_id, - refresh=True, - body={ - "script": { - "source": "ctx._source.meta_fields = params.meta_fields", - "params": {"meta_fields": processed_meta} - } - } + # Document exists - replace meta_fields entirely. + # Using update with a `doc` body would deep-merge the meta_fields + # object and retain old keys that should be removed, so we delegate + # to a backend-provided scripted assignment that fully overwrites it. + replace_meta_fields = getattr(settings.docStoreConn, "replace_meta_fields", None) + if callable(replace_meta_fields) and replace_meta_fields(index_name, doc_id, processed_meta): + logging.debug(f"Successfully updated metadata for document {doc_id} via {type(settings.docStoreConn).__name__}.replace_meta_fields") + return True + logging.warning( + f"replace_meta_fields unavailable or failed on backend " + f"{type(settings.docStoreConn).__name__}; falling back to delete+insert" ) - logging.debug(f"Successfully updated metadata for document {doc_id} using ES script update") - return True + # Mirror the Infinity fallback below so a failed scripted + # replace still guarantees full overwrite semantics rather + # than leaking through the "document not found" branch. + cls.delete_document_metadata(doc_id, kb_id, tenant_id) + return cls.insert_document_metadata(doc_id, processed_meta) except Exception as e: logging.debug(f"Document {doc_id} not found in index, will insert: {e}") @@ -582,13 +594,18 @@ class DocMetadataService: logging.debug(f"[DROP EMPTY TABLE] Table {index_name} exists, checking if empty...") - # Use ES count API for accurate count - # Note: No need to refresh since delete operation already uses refresh=True + # Use the backend-native count primitive when available (ES + OS). + # No need to refresh since delete operation already uses refresh=True. + # The invocation lives inside the try/except so a future backend + # whose count_idx raises (instead of returning the -1 sentinel) + # still falls through to the search-based empty-table check. + count_idx = getattr(settings.docStoreConn, "count_idx", None) try: - count_response = settings.docStoreConn.es.count(index=index_name) - total_count = count_response['count'] - logging.debug(f"[DROP EMPTY TABLE] ES count API result: {total_count} documents") - is_empty = (total_count == 0) + count_value = count_idx(index_name) if callable(count_idx) else -1 + if count_value < 0: + raise RuntimeError("native count_idx unavailable or failed") + logging.debug(f"[DROP EMPTY TABLE] count_idx API result: {count_value} documents") + is_empty = (count_value == 0) except Exception as e: logging.warning(f"[DROP EMPTY TABLE] Count API failed, falling back to search: {e}") # Fallback to search if count fails diff --git a/common/doc_store/es_conn_base.py b/common/doc_store/es_conn_base.py index dccb8a2fe3..88615649f5 100644 --- a/common/doc_store/es_conn_base.py +++ b/common/doc_store/es_conn_base.py @@ -159,6 +159,61 @@ class ESConnectionBase(DocStoreConnection): except Exception as e: self.logger.exception(f"Error creating document metadata index {index_name}: {e}") + def refresh_idx(self, index_name: str) -> bool: + """ + Refresh an index so that recently inserted documents become searchable. + + Service layers should call this dispatch method instead of reaching + into ``self.es`` directly, so the OpenSearch and Elasticsearch + connections present a uniform abstract API. + """ + try: + self.es.indices.refresh(index=index_name) + return True + except NotFoundError: + return False + except Exception as e: + self.logger.warning(f"ESConnection.refresh_idx({index_name}) failed: {e}") + return False + + def count_idx(self, index_name: str) -> int: + """ + Return the document count for an index, or -1 if the call fails. + Used to decide whether a per-tenant metadata index is empty without + paying a full search. + """ + try: + response = self.es.count(index=index_name) + return int(response.get("count", 0)) + except NotFoundError: + return 0 + except Exception as e: + self.logger.warning(f"ESConnection.count_idx({index_name}) failed: {e}") + return -1 + + def replace_meta_fields(self, index_name: str, doc_id: str, meta_fields: dict) -> bool: + """ + Fully replace the ``meta_fields`` object on a single document. + + Using ES.update with a ``doc`` body would deep-merge object fields, + retaining old keys that should be removed. A scripted update assigns + the new meta_fields outright, matching delete-key semantics. + """ + body = { + "script": { + "source": "ctx._source.meta_fields = params.meta_fields", + "params": {"meta_fields": meta_fields}, + } + } + try: + self.es.update(index=index_name, id=doc_id, refresh=True, body=body) + return True + except NotFoundError: + return False + except Exception as e: + self.logger.warning(f"ESConnection.replace_meta_fields({index_name}, {doc_id}) failed: {e}") + return False + def delete_idx(self, index_name: str, dataset_id: str): if len(dataset_id) > 0: # The index need to be alive after any kb deletion since all kb under this tenant are in one index. diff --git a/rag/utils/opensearch_conn.py b/rag/utils/opensearch_conn.py index f2348b7346..2239102ef3 100644 --- a/rag/utils/opensearch_conn.py +++ b/rag/utils/opensearch_conn.py @@ -126,6 +126,99 @@ class OSConnection(DocStoreConnection): except Exception: logger.exception("OSConnection.createIndex error %s" % (indexName)) + def create_doc_meta_idx(self, index_name: str): + """ + Create a per-tenant document metadata index on OpenSearch. + + Mirrors ESConnectionBase.create_doc_meta_idx so that the + DocMetadataService dispatches uniformly across ES and OS backends. + Index name pattern: ragflow_doc_meta_{tenant_id} + """ + if self.index_exist(index_name, ""): + return True + try: + fp_mapping = os.path.join(get_project_base_directory(), "conf", "doc_meta_es_mapping.json") + if not os.path.exists(fp_mapping): + logger.error(f"Document metadata mapping file not found at {fp_mapping}") + return False + + with open(fp_mapping, "r") as f: + doc_meta_mapping = json.load(f) + + from opensearchpy.client import IndicesClient + body = { + "settings": doc_meta_mapping["settings"], + "mappings": doc_meta_mapping["mappings"], + } + return IndicesClient(self.os).create(index=index_name, body=body) + except Exception as e: + logger.exception(f"OSConnection.create_doc_meta_idx error creating {index_name}: {e}") + return False + + def refresh_idx(self, index_name: str) -> bool: + """ + Refresh an index so that recently inserted documents become searchable. + + DocMetadataService used to call ``settings.docStoreConn.es.indices.refresh`` + directly, which raised AttributeError on the OpenSearch backend because + OSConnection exposes ``self.os`` rather than ``self.es``. This wrapper + gives both backends a uniform abstract entry point. + """ + try: + self.os.indices.refresh(index=index_name) + return True + except NotFoundError: + return False + except Exception as e: + logger.warning(f"OSConnection.refresh_idx({index_name}) failed: {e}") + return False + + def count_idx(self, index_name: str) -> int: + """ + Return the document count for an index, or -1 if the call fails. + + Used by DocMetadataService._drop_empty_metadata_table to decide whether + a per-tenant metadata index is empty without paying a full search. + """ + try: + response = self.os.count(index=index_name) + return int(response.get("count", 0)) + except NotFoundError: + return 0 + except Exception as e: + logger.warning(f"OSConnection.count_idx({index_name}) failed: {e}") + return -1 + + def replace_meta_fields(self, index_name: str, doc_id: str, meta_fields: dict) -> bool: + """ + Replace the ``meta_fields`` object on a single document. + + ES.update with a ``doc`` body deep-merges object fields, which retains + old keys that should be removed. The fix in ESConnection is a script + that fully assigns the new meta_fields. We provide the same primitive + on OpenSearch so the service layer never reaches into ``self.es`` or + ``self.os`` directly. + """ + body = { + "script": { + "source": "ctx._source.meta_fields = params.meta_fields", + "params": {"meta_fields": meta_fields}, + } + } + for _ in range(ATTEMPT_TIME): + try: + self.os.update(index=index_name, id=doc_id, body=body, refresh=True) + return True + except NotFoundError: + return False + except Exception as e: + logger.warning(f"OSConnection.replace_meta_fields({index_name}, {doc_id}) failed: {e}") + if re.search(r"(timeout|connection)", str(e).lower()): + time.sleep(1) + continue + return False + return False + def delete_idx(self, indexName: str, knowledgebaseId: str): if len(knowledgebaseId) > 0: # The index need to be alive after any kb deletion since all kb under this tenant are in one index. diff --git a/test/unit_test/rag/utils/test_opensearch_doc_meta.py b/test/unit_test/rag/utils/test_opensearch_doc_meta.py new file mode 100644 index 0000000000..ead97f6f8b --- /dev/null +++ b/test/unit_test/rag/utils/test_opensearch_doc_meta.py @@ -0,0 +1,288 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Unit tests for the document-metadata helpers added to OSConnection. + +Covers issue #14570: PATCH /api/v1/datasets/{ds}/documents/{doc} with +{"meta_fields": {...}} previously raised +``'OSConnection' object has no attribute 'create_doc_meta_idx'`` when the +backend was OpenSearch. These tests pin the new dispatch surface so the same +regression cannot return: every helper that DocMetadataService dispatches to +on the ES path must exist on OSConnection too, with semantically equivalent +behaviour. + +The OpenSearch and Elasticsearch SDKs are imported at module load; mocking +the underlying client lets us exercise OSConnection methods in isolation +without a live cluster. +""" +from __future__ import annotations + +import sys +import types +from unittest.mock import MagicMock, patch + +import pytest + + +# Importing OSConnection touches opensearchpy at module load, so guard for +# environments where the package isn't installed. +opensearchpy = pytest.importorskip("opensearchpy") + + +def _install_module(name: str, **attrs) -> types.ModuleType: + mod = sys.modules.get(name) + if mod is None: + mod = types.ModuleType(name) + sys.modules[name] = mod + for key, value in attrs.items(): + if not hasattr(mod, key): + setattr(mod, key, value) + return mod + + +def _install_module_stubs() -> None: + """Bypass heavy optional backends for connection-only tests. + + ``rag.utils.opensearch_conn`` imports ``common.settings`` and ``rag.nlp`` + at module load. ``common.settings`` in turn pulls every storage backend + (Infinity, OceanBase, Azure, MinIO, GCS …), which is more surface than + these connection-only tests need. We replace just the modules opensearch_conn + captures so the real ``OSConnection`` class loads. + """ + _install_module( + "common.settings", + OS={"hosts": "stub", "username": "u", "password": "p"}, + ES={}, + DOC_ENGINE_INFINITY=False, + DOC_ENGINE_OCEANBASE=False, + DOC_ENGINE="opensearch", + docStoreConn=None, + ) + _install_module( + "rag.nlp", + is_english=lambda *_args, **_kwargs: False, + rag_tokenizer=MagicMock(), + ) + + +_install_module_stubs() + + +class _FakeFile: + """Minimal file-like stand-in supporting ``json.load``.""" + + def __init__(self, content: str) -> None: + self._content = content + + def read(self, *_args, **_kwargs) -> str: + return self._content + + +def _open_returning_payload(payload: dict): + """Build a context-manager mock for ``open`` that yields the JSON payload.""" + import json as _json + + fake_handle = MagicMock() + fake_handle.__enter__ = MagicMock(return_value=_FakeFile(_json.dumps(payload))) + fake_handle.__exit__ = MagicMock(return_value=False) + return MagicMock(return_value=fake_handle) + + +def _resolve_os_connection_class(): + """Return the real OSConnection class. + + ``@singleton`` from ``common.decorator`` wraps the class with a closure + that returns the cached instance on call. ``OSConnection`` at module + scope is therefore a function, not a type. We unwrap it to recover the + underlying class so we can call ``__new__`` directly without going through + ``__init__`` (which would attempt a real OpenSearch handshake). + """ + from rag.utils import opensearch_conn + + candidate = opensearch_conn.OSConnection + if isinstance(candidate, type): + return candidate + closure = getattr(candidate, "__closure__", None) or () + for cell in closure: + contents = cell.cell_contents + if isinstance(contents, type): + return contents + raise RuntimeError("Could not locate the OSConnection class in module scope") + + +def _make_os_connection(): + """Build an OSConnection without invoking its real network-dependent __init__.""" + cls = _resolve_os_connection_class() + instance = cls.__new__(cls) + instance.os = MagicMock() + instance.info = {"version": {"number": "2.18.0"}} + instance.mapping = {"settings": {}, "mappings": {}} + return instance + + +class TestOSConnectionMetaSurface: + """The OSConnection class must expose the dispatch surface + DocMetadataService relies on.""" + + def test_create_doc_meta_idx_exists(self): + cls = _resolve_os_connection_class() + assert callable(getattr(cls, "create_doc_meta_idx", None)), ( + "OSConnection.create_doc_meta_idx is required so the metadata " + "PATCH path does not raise AttributeError on OpenSearch backends " + "(issue #14570)." + ) + + def test_refresh_idx_exists(self): + cls = _resolve_os_connection_class() + assert callable(getattr(cls, "refresh_idx", None)) + + def test_count_idx_exists(self): + cls = _resolve_os_connection_class() + assert callable(getattr(cls, "count_idx", None)) + + def test_replace_meta_fields_exists(self): + cls = _resolve_os_connection_class() + assert callable(getattr(cls, "replace_meta_fields", None)) + + +class TestCreateDocMetaIdx: + """Behavioural tests for OSConnection.create_doc_meta_idx.""" + + def test_returns_true_when_index_already_exists(self): + conn = _make_os_connection() + with patch.object(_resolve_os_connection_class(), "index_exist", return_value=True) as exist: + assert conn.create_doc_meta_idx("ragflow_doc_meta_t1") is True + exist.assert_called_once_with("ragflow_doc_meta_t1", "") + + def test_creates_index_with_doc_meta_mapping(self): + conn = _make_os_connection() + fake_indices = MagicMock() + fake_indices.create.return_value = {"acknowledged": True} + cls = _resolve_os_connection_class() + + with patch.object(cls, "index_exist", return_value=False), \ + patch("rag.utils.opensearch_conn.os.path.exists", return_value=True), \ + patch( + "rag.utils.opensearch_conn.open", + new=_open_returning_payload({ + "settings": {"index": {"number_of_shards": 2}}, + "mappings": {"properties": {"meta_fields": {"type": "object"}}}, + }), + create=True, + ), \ + patch("opensearchpy.client.IndicesClient", return_value=fake_indices): + result = conn.create_doc_meta_idx("ragflow_doc_meta_t1") + + assert result == {"acknowledged": True} + fake_indices.create.assert_called_once() + kwargs = fake_indices.create.call_args.kwargs + assert kwargs["index"] == "ragflow_doc_meta_t1" + body = kwargs["body"] + assert "settings" in body and "mappings" in body + assert body["mappings"]["properties"]["meta_fields"]["type"] == "object" + + def test_returns_false_when_mapping_file_missing(self): + conn = _make_os_connection() + cls = _resolve_os_connection_class() + with patch.object(cls, "index_exist", return_value=False), \ + patch("rag.utils.opensearch_conn.os.path.exists", return_value=False): + assert conn.create_doc_meta_idx("ragflow_doc_meta_t1") is False + + def test_returns_false_when_create_call_explodes(self): + """If the underlying IndicesClient.create raises, the helper must + swallow the exception and return False so the service layer can fall + back gracefully (mirrors ESConnectionBase.create_doc_meta_idx).""" + conn = _make_os_connection() + cls = _resolve_os_connection_class() + fake_indices = MagicMock() + fake_indices.create.side_effect = RuntimeError("opensearch unreachable") + + with patch.object(cls, "index_exist", return_value=False), \ + patch("rag.utils.opensearch_conn.os.path.exists", return_value=True), \ + patch( + "rag.utils.opensearch_conn.open", + new=_open_returning_payload({"settings": {}, "mappings": {}}), + create=True, + ), \ + patch("opensearchpy.client.IndicesClient", return_value=fake_indices): + assert conn.create_doc_meta_idx("ragflow_doc_meta_t1") is False + + +class TestRefreshIdx: + def test_calls_indices_refresh(self): + conn = _make_os_connection() + assert conn.refresh_idx("ragflow_doc_meta_t1") is True + conn.os.indices.refresh.assert_called_once_with(index="ragflow_doc_meta_t1") + + def test_returns_false_on_not_found(self): + conn = _make_os_connection() + conn.os.indices.refresh.side_effect = opensearchpy.NotFoundError( + 404, "index_not_found_exception", {} + ) + assert conn.refresh_idx("missing_idx") is False + + def test_swallows_other_errors_and_returns_false(self): + conn = _make_os_connection() + conn.os.indices.refresh.side_effect = RuntimeError("transient") + assert conn.refresh_idx("ragflow_doc_meta_t1") is False + + +class TestCountIdx: + def test_returns_count_value(self): + conn = _make_os_connection() + conn.os.count.return_value = {"count": 42} + assert conn.count_idx("ragflow_doc_meta_t1") == 42 + conn.os.count.assert_called_once_with(index="ragflow_doc_meta_t1") + + def test_missing_index_reads_as_zero(self): + conn = _make_os_connection() + conn.os.count.side_effect = opensearchpy.NotFoundError( + 404, "index_not_found_exception", {} + ) + assert conn.count_idx("ragflow_doc_meta_t1") == 0 + + def test_other_failure_returns_negative_one(self): + conn = _make_os_connection() + conn.os.count.side_effect = RuntimeError("bad") + assert conn.count_idx("ragflow_doc_meta_t1") == -1 + + +class TestReplaceMetaFields: + def test_emits_full_assignment_script(self): + conn = _make_os_connection() + conn.os.update.return_value = {"_id": "doc-1", "result": "updated"} + meta = {"author": "alice", "year": 2026} + + ok = conn.replace_meta_fields("ragflow_doc_meta_t1", "doc-1", meta) + + assert ok is True + conn.os.update.assert_called_once() + kwargs = conn.os.update.call_args.kwargs + assert kwargs["index"] == "ragflow_doc_meta_t1" + assert kwargs["id"] == "doc-1" + assert kwargs["refresh"] is True + body = kwargs["body"] + # The script must fully assign meta_fields, otherwise removed keys + # would persist via deep merge. + assert body["script"]["source"] == "ctx._source.meta_fields = params.meta_fields" + assert body["script"]["params"]["meta_fields"] == meta + + def test_returns_false_when_doc_missing(self): + conn = _make_os_connection() + conn.os.update.side_effect = opensearchpy.NotFoundError( + 404, "document_missing_exception", {} + ) + assert conn.replace_meta_fields("ragflow_doc_meta_t1", "absent", {"a": 1}) is False