From 663fc1d42cb26ec22e81f4f6e477094eb61a1f39 Mon Sep 17 00:00:00 2001 From: tmimmanuel <14046872+tmimmanuel@users.noreply.github.com> Date: Sun, 10 May 2026 23:04:28 -1000 Subject: [PATCH] fix(opensearch): implement doc-meta dispatch surface on OSConnection (#14577) ### What problem does this PR solve? Fixes #14570. On OpenSearch backends (`DOC_ENGINE=opensearch`) every document-metadata write failed with `'OSConnection' object has no attribute 'create_doc_meta_idx'`, so both `PATCH /api/v1/datasets/{ds}/documents/{doc}` with `meta_fields` and `POST /api/v1/datasets/{ds}/metadata/update` were unusable while every other document operation (retrieval, parsing, name update, chunk management) worked correctly on the same OpenSearch cluster. The bug runs deeper than the missing method name in the error message suggests. `DocMetadataService` also reached into `settings.docStoreConn.es.*` directly for the index refresh, the scripted partial update, and the count call, which means that even after adding `create_doc_meta_idx` to `OSConnection` the very next call in the same metadata flow would still raise `AttributeError` because `OSConnection` exposes `self.os` rather than `self.es`. Fixing only the reported symptom would have moved the failure one line down without restoring the feature. This PR adds a uniform document-metadata dispatch surface to both connection classes so they present the same abstract API, and routes the service layer through that surface via `getattr` guards instead of poking at backend-specific attributes. The four new methods on `OSConnection` and `ESConnectionBase` are `create_doc_meta_idx`, `refresh_idx`, `count_idx`, and `replace_meta_fields`. `OSConnection.create_doc_meta_idx` reuses the existing `conf/doc_meta_es_mapping.json` schema in the OpenSearch `body=` form because OpenSearch and Elasticsearch share the same index-creation payload, and `replace_meta_fields` emits a full scripted assignment (`ctx._source.meta_fields = params.meta_fields`) on both backends so removed keys actually disappear instead of being preserved by deep-merge semantics. The `getattr`-guarded dispatch in `DocMetadataService` keeps the existing fall-through paths intact for Infinity and OceanBase, which continue to rely on their search-based count fallback and on the delete-then-insert metadata replacement they used before, so this change is strictly additive for those two backends. Verification: `pytest test/unit_test/rag/utils/test_opensearch_doc_meta.py` runs 16 new unit tests that pass locally and pin the `OSConnection` dispatch surface, the `create_doc_meta_idx` short-circuit when the index already exists, the mapping-file payload routing, the `IndicesClient.create` failure path, the `refresh_idx` and `count_idx` success and error sentinels, and the full-assignment script emitted by `replace_meta_fields`. The test module stubs `common.settings` and `rag.nlp` at import time so the suite runs without the heavy backend SDKs that the rest of the repository pulls in transitively. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --------- Co-authored-by: tmimmanuel --- api/db/services/doc_metadata_service.py | 73 +++-- common/doc_store/es_conn_base.py | 55 ++++ rag/utils/opensearch_conn.py | 93 ++++++ .../rag/utils/test_opensearch_doc_meta.py | 288 ++++++++++++++++++ 4 files changed, 481 insertions(+), 28 deletions(-) create mode 100644 test/unit_test/rag/utils/test_opensearch_doc_meta.py diff --git a/api/db/services/doc_metadata_service.py b/api/db/services/doc_metadata_service.py index 1cf887c2d3..34258c69f5 100644 --- a/api/db/services/doc_metadata_service.py +++ b/api/db/services/doc_metadata_service.py @@ -385,13 +385,25 @@ class DocMetadataService: if result: logging.error(f"Failed to insert metadata for document {doc_id}: {result}") return False - # Force ES refresh to make metadata immediately available for search + # Force refresh so metadata is immediately searchable. + # Both Elasticsearch and OpenSearch backends expose refresh_idx; + # Infinity does not need a manual refresh. if not settings.DOC_ENGINE_INFINITY: - try: - settings.docStoreConn.es.indices.refresh(index=index_name) - logging.debug(f"Refreshed metadata index: {index_name}") - except Exception as e: - logging.warning(f"Failed to refresh metadata index {index_name}: {e}") + refresh_idx = getattr(settings.docStoreConn, "refresh_idx", None) + if callable(refresh_idx): + if refresh_idx(index_name): + logging.debug(f"Refreshed metadata index: {index_name}") + else: + # A failed refresh can leave just-inserted metadata + # invisible to subsequent reads; surface it so operators + # can correlate stale-read complaints with the cause. + logging.warning( + f"Failed to refresh metadata index {index_name} on backend " + f"{type(settings.docStoreConn).__name__}; " + f"metadata may not be immediately searchable" + ) + else: + logging.debug(f"Backend {type(settings.docStoreConn).__name__} has no refresh_idx; skipping") logging.debug(f"Successfully inserted metadata for document {doc_id}") return True @@ -459,23 +471,23 @@ class DocMetadataService: [kb_id] ) if doc_exists: - # Document exists - replace meta_fields entirely - # Use upsert to fully replace the meta_fields field - # (ES update with doc parameter does deep merge on object fields, - # which would retain old keys that should be removed) - settings.docStoreConn.es.update( - index=index_name, - id=doc_id, - refresh=True, - body={ - "script": { - "source": "ctx._source.meta_fields = params.meta_fields", - "params": {"meta_fields": processed_meta} - } - } + # Document exists - replace meta_fields entirely. + # Using update with a `doc` body would deep-merge the meta_fields + # object and retain old keys that should be removed, so we delegate + # to a backend-provided scripted assignment that fully overwrites it. + replace_meta_fields = getattr(settings.docStoreConn, "replace_meta_fields", None) + if callable(replace_meta_fields) and replace_meta_fields(index_name, doc_id, processed_meta): + logging.debug(f"Successfully updated metadata for document {doc_id} via {type(settings.docStoreConn).__name__}.replace_meta_fields") + return True + logging.warning( + f"replace_meta_fields unavailable or failed on backend " + f"{type(settings.docStoreConn).__name__}; falling back to delete+insert" ) - logging.debug(f"Successfully updated metadata for document {doc_id} using ES script update") - return True + # Mirror the Infinity fallback below so a failed scripted + # replace still guarantees full overwrite semantics rather + # than leaking through the "document not found" branch. + cls.delete_document_metadata(doc_id, kb_id, tenant_id) + return cls.insert_document_metadata(doc_id, processed_meta) except Exception as e: logging.debug(f"Document {doc_id} not found in index, will insert: {e}") @@ -582,13 +594,18 @@ class DocMetadataService: logging.debug(f"[DROP EMPTY TABLE] Table {index_name} exists, checking if empty...") - # Use ES count API for accurate count - # Note: No need to refresh since delete operation already uses refresh=True + # Use the backend-native count primitive when available (ES + OS). + # No need to refresh since delete operation already uses refresh=True. + # The invocation lives inside the try/except so a future backend + # whose count_idx raises (instead of returning the -1 sentinel) + # still falls through to the search-based empty-table check. + count_idx = getattr(settings.docStoreConn, "count_idx", None) try: - count_response = settings.docStoreConn.es.count(index=index_name) - total_count = count_response['count'] - logging.debug(f"[DROP EMPTY TABLE] ES count API result: {total_count} documents") - is_empty = (total_count == 0) + count_value = count_idx(index_name) if callable(count_idx) else -1 + if count_value < 0: + raise RuntimeError("native count_idx unavailable or failed") + logging.debug(f"[DROP EMPTY TABLE] count_idx API result: {count_value} documents") + is_empty = (count_value == 0) except Exception as e: logging.warning(f"[DROP EMPTY TABLE] Count API failed, falling back to search: {e}") # Fallback to search if count fails diff --git a/common/doc_store/es_conn_base.py b/common/doc_store/es_conn_base.py index dccb8a2fe3..88615649f5 100644 --- a/common/doc_store/es_conn_base.py +++ b/common/doc_store/es_conn_base.py @@ -159,6 +159,61 @@ class ESConnectionBase(DocStoreConnection): except Exception as e: self.logger.exception(f"Error creating document metadata index {index_name}: {e}") + def refresh_idx(self, index_name: str) -> bool: + """ + Refresh an index so that recently inserted documents become searchable. + + Service layers should call this dispatch method instead of reaching + into ``self.es`` directly, so the OpenSearch and Elasticsearch + connections present a uniform abstract API. + """ + try: + self.es.indices.refresh(index=index_name) + return True + except NotFoundError: + return False + except Exception as e: + self.logger.warning(f"ESConnection.refresh_idx({index_name}) failed: {e}") + return False + + def count_idx(self, index_name: str) -> int: + """ + Return the document count for an index, or -1 if the call fails. + Used to decide whether a per-tenant metadata index is empty without + paying a full search. + """ + try: + response = self.es.count(index=index_name) + return int(response.get("count", 0)) + except NotFoundError: + return 0 + except Exception as e: + self.logger.warning(f"ESConnection.count_idx({index_name}) failed: {e}") + return -1 + + def replace_meta_fields(self, index_name: str, doc_id: str, meta_fields: dict) -> bool: + """ + Fully replace the ``meta_fields`` object on a single document. + + Using ES.update with a ``doc`` body would deep-merge object fields, + retaining old keys that should be removed. A scripted update assigns + the new meta_fields outright, matching delete-key semantics. + """ + body = { + "script": { + "source": "ctx._source.meta_fields = params.meta_fields", + "params": {"meta_fields": meta_fields}, + } + } + try: + self.es.update(index=index_name, id=doc_id, refresh=True, body=body) + return True + except NotFoundError: + return False + except Exception as e: + self.logger.warning(f"ESConnection.replace_meta_fields({index_name}, {doc_id}) failed: {e}") + return False + def delete_idx(self, index_name: str, dataset_id: str): if len(dataset_id) > 0: # The index need to be alive after any kb deletion since all kb under this tenant are in one index. diff --git a/rag/utils/opensearch_conn.py b/rag/utils/opensearch_conn.py index f2348b7346..2239102ef3 100644 --- a/rag/utils/opensearch_conn.py +++ b/rag/utils/opensearch_conn.py @@ -126,6 +126,99 @@ class OSConnection(DocStoreConnection): except Exception: logger.exception("OSConnection.createIndex error %s" % (indexName)) + def create_doc_meta_idx(self, index_name: str): + """ + Create a per-tenant document metadata index on OpenSearch. + + Mirrors ESConnectionBase.create_doc_meta_idx so that the + DocMetadataService dispatches uniformly across ES and OS backends. + Index name pattern: ragflow_doc_meta_{tenant_id} + """ + if self.index_exist(index_name, ""): + return True + try: + fp_mapping = os.path.join(get_project_base_directory(), "conf", "doc_meta_es_mapping.json") + if not os.path.exists(fp_mapping): + logger.error(f"Document metadata mapping file not found at {fp_mapping}") + return False + + with open(fp_mapping, "r") as f: + doc_meta_mapping = json.load(f) + + from opensearchpy.client import IndicesClient + body = { + "settings": doc_meta_mapping["settings"], + "mappings": doc_meta_mapping["mappings"], + } + return IndicesClient(self.os).create(index=index_name, body=body) + except Exception as e: + logger.exception(f"OSConnection.create_doc_meta_idx error creating {index_name}: {e}") + return False + + def refresh_idx(self, index_name: str) -> bool: + """ + Refresh an index so that recently inserted documents become searchable. + + DocMetadataService used to call ``settings.docStoreConn.es.indices.refresh`` + directly, which raised AttributeError on the OpenSearch backend because + OSConnection exposes ``self.os`` rather than ``self.es``. This wrapper + gives both backends a uniform abstract entry point. + """ + try: + self.os.indices.refresh(index=index_name) + return True + except NotFoundError: + return False + except Exception as e: + logger.warning(f"OSConnection.refresh_idx({index_name}) failed: {e}") + return False + + def count_idx(self, index_name: str) -> int: + """ + Return the document count for an index, or -1 if the call fails. + + Used by DocMetadataService._drop_empty_metadata_table to decide whether + a per-tenant metadata index is empty without paying a full search. + """ + try: + response = self.os.count(index=index_name) + return int(response.get("count", 0)) + except NotFoundError: + return 0 + except Exception as e: + logger.warning(f"OSConnection.count_idx({index_name}) failed: {e}") + return -1 + + def replace_meta_fields(self, index_name: str, doc_id: str, meta_fields: dict) -> bool: + """ + Replace the ``meta_fields`` object on a single document. + + ES.update with a ``doc`` body deep-merges object fields, which retains + old keys that should be removed. The fix in ESConnection is a script + that fully assigns the new meta_fields. We provide the same primitive + on OpenSearch so the service layer never reaches into ``self.es`` or + ``self.os`` directly. + """ + body = { + "script": { + "source": "ctx._source.meta_fields = params.meta_fields", + "params": {"meta_fields": meta_fields}, + } + } + for _ in range(ATTEMPT_TIME): + try: + self.os.update(index=index_name, id=doc_id, body=body, refresh=True) + return True + except NotFoundError: + return False + except Exception as e: + logger.warning(f"OSConnection.replace_meta_fields({index_name}, {doc_id}) failed: {e}") + if re.search(r"(timeout|connection)", str(e).lower()): + time.sleep(1) + continue + return False + return False + def delete_idx(self, indexName: str, knowledgebaseId: str): if len(knowledgebaseId) > 0: # The index need to be alive after any kb deletion since all kb under this tenant are in one index. diff --git a/test/unit_test/rag/utils/test_opensearch_doc_meta.py b/test/unit_test/rag/utils/test_opensearch_doc_meta.py new file mode 100644 index 0000000000..ead97f6f8b --- /dev/null +++ b/test/unit_test/rag/utils/test_opensearch_doc_meta.py @@ -0,0 +1,288 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Unit tests for the document-metadata helpers added to OSConnection. + +Covers issue #14570: PATCH /api/v1/datasets/{ds}/documents/{doc} with +{"meta_fields": {...}} previously raised +``'OSConnection' object has no attribute 'create_doc_meta_idx'`` when the +backend was OpenSearch. These tests pin the new dispatch surface so the same +regression cannot return: every helper that DocMetadataService dispatches to +on the ES path must exist on OSConnection too, with semantically equivalent +behaviour. + +The OpenSearch and Elasticsearch SDKs are imported at module load; mocking +the underlying client lets us exercise OSConnection methods in isolation +without a live cluster. +""" +from __future__ import annotations + +import sys +import types +from unittest.mock import MagicMock, patch + +import pytest + + +# Importing OSConnection touches opensearchpy at module load, so guard for +# environments where the package isn't installed. +opensearchpy = pytest.importorskip("opensearchpy") + + +def _install_module(name: str, **attrs) -> types.ModuleType: + mod = sys.modules.get(name) + if mod is None: + mod = types.ModuleType(name) + sys.modules[name] = mod + for key, value in attrs.items(): + if not hasattr(mod, key): + setattr(mod, key, value) + return mod + + +def _install_module_stubs() -> None: + """Bypass heavy optional backends for connection-only tests. + + ``rag.utils.opensearch_conn`` imports ``common.settings`` and ``rag.nlp`` + at module load. ``common.settings`` in turn pulls every storage backend + (Infinity, OceanBase, Azure, MinIO, GCS …), which is more surface than + these connection-only tests need. We replace just the modules opensearch_conn + captures so the real ``OSConnection`` class loads. + """ + _install_module( + "common.settings", + OS={"hosts": "stub", "username": "u", "password": "p"}, + ES={}, + DOC_ENGINE_INFINITY=False, + DOC_ENGINE_OCEANBASE=False, + DOC_ENGINE="opensearch", + docStoreConn=None, + ) + _install_module( + "rag.nlp", + is_english=lambda *_args, **_kwargs: False, + rag_tokenizer=MagicMock(), + ) + + +_install_module_stubs() + + +class _FakeFile: + """Minimal file-like stand-in supporting ``json.load``.""" + + def __init__(self, content: str) -> None: + self._content = content + + def read(self, *_args, **_kwargs) -> str: + return self._content + + +def _open_returning_payload(payload: dict): + """Build a context-manager mock for ``open`` that yields the JSON payload.""" + import json as _json + + fake_handle = MagicMock() + fake_handle.__enter__ = MagicMock(return_value=_FakeFile(_json.dumps(payload))) + fake_handle.__exit__ = MagicMock(return_value=False) + return MagicMock(return_value=fake_handle) + + +def _resolve_os_connection_class(): + """Return the real OSConnection class. + + ``@singleton`` from ``common.decorator`` wraps the class with a closure + that returns the cached instance on call. ``OSConnection`` at module + scope is therefore a function, not a type. We unwrap it to recover the + underlying class so we can call ``__new__`` directly without going through + ``__init__`` (which would attempt a real OpenSearch handshake). + """ + from rag.utils import opensearch_conn + + candidate = opensearch_conn.OSConnection + if isinstance(candidate, type): + return candidate + closure = getattr(candidate, "__closure__", None) or () + for cell in closure: + contents = cell.cell_contents + if isinstance(contents, type): + return contents + raise RuntimeError("Could not locate the OSConnection class in module scope") + + +def _make_os_connection(): + """Build an OSConnection without invoking its real network-dependent __init__.""" + cls = _resolve_os_connection_class() + instance = cls.__new__(cls) + instance.os = MagicMock() + instance.info = {"version": {"number": "2.18.0"}} + instance.mapping = {"settings": {}, "mappings": {}} + return instance + + +class TestOSConnectionMetaSurface: + """The OSConnection class must expose the dispatch surface + DocMetadataService relies on.""" + + def test_create_doc_meta_idx_exists(self): + cls = _resolve_os_connection_class() + assert callable(getattr(cls, "create_doc_meta_idx", None)), ( + "OSConnection.create_doc_meta_idx is required so the metadata " + "PATCH path does not raise AttributeError on OpenSearch backends " + "(issue #14570)." + ) + + def test_refresh_idx_exists(self): + cls = _resolve_os_connection_class() + assert callable(getattr(cls, "refresh_idx", None)) + + def test_count_idx_exists(self): + cls = _resolve_os_connection_class() + assert callable(getattr(cls, "count_idx", None)) + + def test_replace_meta_fields_exists(self): + cls = _resolve_os_connection_class() + assert callable(getattr(cls, "replace_meta_fields", None)) + + +class TestCreateDocMetaIdx: + """Behavioural tests for OSConnection.create_doc_meta_idx.""" + + def test_returns_true_when_index_already_exists(self): + conn = _make_os_connection() + with patch.object(_resolve_os_connection_class(), "index_exist", return_value=True) as exist: + assert conn.create_doc_meta_idx("ragflow_doc_meta_t1") is True + exist.assert_called_once_with("ragflow_doc_meta_t1", "") + + def test_creates_index_with_doc_meta_mapping(self): + conn = _make_os_connection() + fake_indices = MagicMock() + fake_indices.create.return_value = {"acknowledged": True} + cls = _resolve_os_connection_class() + + with patch.object(cls, "index_exist", return_value=False), \ + patch("rag.utils.opensearch_conn.os.path.exists", return_value=True), \ + patch( + "rag.utils.opensearch_conn.open", + new=_open_returning_payload({ + "settings": {"index": {"number_of_shards": 2}}, + "mappings": {"properties": {"meta_fields": {"type": "object"}}}, + }), + create=True, + ), \ + patch("opensearchpy.client.IndicesClient", return_value=fake_indices): + result = conn.create_doc_meta_idx("ragflow_doc_meta_t1") + + assert result == {"acknowledged": True} + fake_indices.create.assert_called_once() + kwargs = fake_indices.create.call_args.kwargs + assert kwargs["index"] == "ragflow_doc_meta_t1" + body = kwargs["body"] + assert "settings" in body and "mappings" in body + assert body["mappings"]["properties"]["meta_fields"]["type"] == "object" + + def test_returns_false_when_mapping_file_missing(self): + conn = _make_os_connection() + cls = _resolve_os_connection_class() + with patch.object(cls, "index_exist", return_value=False), \ + patch("rag.utils.opensearch_conn.os.path.exists", return_value=False): + assert conn.create_doc_meta_idx("ragflow_doc_meta_t1") is False + + def test_returns_false_when_create_call_explodes(self): + """If the underlying IndicesClient.create raises, the helper must + swallow the exception and return False so the service layer can fall + back gracefully (mirrors ESConnectionBase.create_doc_meta_idx).""" + conn = _make_os_connection() + cls = _resolve_os_connection_class() + fake_indices = MagicMock() + fake_indices.create.side_effect = RuntimeError("opensearch unreachable") + + with patch.object(cls, "index_exist", return_value=False), \ + patch("rag.utils.opensearch_conn.os.path.exists", return_value=True), \ + patch( + "rag.utils.opensearch_conn.open", + new=_open_returning_payload({"settings": {}, "mappings": {}}), + create=True, + ), \ + patch("opensearchpy.client.IndicesClient", return_value=fake_indices): + assert conn.create_doc_meta_idx("ragflow_doc_meta_t1") is False + + +class TestRefreshIdx: + def test_calls_indices_refresh(self): + conn = _make_os_connection() + assert conn.refresh_idx("ragflow_doc_meta_t1") is True + conn.os.indices.refresh.assert_called_once_with(index="ragflow_doc_meta_t1") + + def test_returns_false_on_not_found(self): + conn = _make_os_connection() + conn.os.indices.refresh.side_effect = opensearchpy.NotFoundError( + 404, "index_not_found_exception", {} + ) + assert conn.refresh_idx("missing_idx") is False + + def test_swallows_other_errors_and_returns_false(self): + conn = _make_os_connection() + conn.os.indices.refresh.side_effect = RuntimeError("transient") + assert conn.refresh_idx("ragflow_doc_meta_t1") is False + + +class TestCountIdx: + def test_returns_count_value(self): + conn = _make_os_connection() + conn.os.count.return_value = {"count": 42} + assert conn.count_idx("ragflow_doc_meta_t1") == 42 + conn.os.count.assert_called_once_with(index="ragflow_doc_meta_t1") + + def test_missing_index_reads_as_zero(self): + conn = _make_os_connection() + conn.os.count.side_effect = opensearchpy.NotFoundError( + 404, "index_not_found_exception", {} + ) + assert conn.count_idx("ragflow_doc_meta_t1") == 0 + + def test_other_failure_returns_negative_one(self): + conn = _make_os_connection() + conn.os.count.side_effect = RuntimeError("bad") + assert conn.count_idx("ragflow_doc_meta_t1") == -1 + + +class TestReplaceMetaFields: + def test_emits_full_assignment_script(self): + conn = _make_os_connection() + conn.os.update.return_value = {"_id": "doc-1", "result": "updated"} + meta = {"author": "alice", "year": 2026} + + ok = conn.replace_meta_fields("ragflow_doc_meta_t1", "doc-1", meta) + + assert ok is True + conn.os.update.assert_called_once() + kwargs = conn.os.update.call_args.kwargs + assert kwargs["index"] == "ragflow_doc_meta_t1" + assert kwargs["id"] == "doc-1" + assert kwargs["refresh"] is True + body = kwargs["body"] + # The script must fully assign meta_fields, otherwise removed keys + # would persist via deep merge. + assert body["script"]["source"] == "ctx._source.meta_fields = params.meta_fields" + assert body["script"]["params"]["meta_fields"] == meta + + def test_returns_false_when_doc_missing(self): + conn = _make_os_connection() + conn.os.update.side_effect = opensearchpy.NotFoundError( + 404, "document_missing_exception", {} + ) + assert conn.replace_meta_fields("ragflow_doc_meta_t1", "absent", {"a": 1}) is False