mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-04 18:45:38 +08:00
287 lines
11 KiB
Python
287 lines
11 KiB
Python
#
|
|
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
"""
|
|
Unit tests for the document-metadata helpers added to OSConnection.
|
|
|
|
Covers issue #14570: PATCH /api/v1/datasets/{ds}/documents/{doc} with
|
|
{"meta_fields": {...}} previously raised
|
|
``'OSConnection' object has no attribute 'create_doc_meta_idx'`` when the
|
|
backend was OpenSearch. These tests pin the new dispatch surface so the same
|
|
regression cannot return: every helper that DocMetadataService dispatches to
|
|
on the ES path must exist on OSConnection too, with semantically equivalent
|
|
behaviour.
|
|
|
|
The OpenSearch and Elasticsearch SDKs are imported at module load; mocking
|
|
the underlying client lets us exercise OSConnection methods in isolation
|
|
without a live cluster.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
import types
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
|
|
# Importing OSConnection touches opensearchpy at module load, so guard for
|
|
# environments where the package isn't installed.
|
|
opensearchpy = pytest.importorskip("opensearchpy")
|
|
|
|
|
|
def _install_module(name: str, **attrs) -> types.ModuleType:
|
|
mod = sys.modules.get(name)
|
|
if mod is None:
|
|
mod = types.ModuleType(name)
|
|
sys.modules[name] = mod
|
|
for key, value in attrs.items():
|
|
if not hasattr(mod, key):
|
|
setattr(mod, key, value)
|
|
return mod
|
|
|
|
|
|
def _install_module_stubs() -> None:
|
|
"""Bypass heavy optional backends for connection-only tests.
|
|
|
|
``rag.utils.opensearch_conn`` imports ``common.settings`` and ``rag.nlp``
|
|
at module load. ``common.settings`` in turn pulls every storage backend
|
|
(Infinity, OceanBase, Azure, MinIO, GCS …), which is more surface than
|
|
these connection-only tests need. We replace just the modules opensearch_conn
|
|
captures so the real ``OSConnection`` class loads.
|
|
"""
|
|
_install_module(
|
|
"common.settings",
|
|
OS={"hosts": "stub", "username": "u", "password": "p"},
|
|
ES={},
|
|
DOC_ENGINE_INFINITY=False,
|
|
DOC_ENGINE_OCEANBASE=False,
|
|
DOC_ENGINE="opensearch",
|
|
docStoreConn=None,
|
|
)
|
|
_install_module(
|
|
"rag.nlp",
|
|
is_english=lambda *_args, **_kwargs: False,
|
|
rag_tokenizer=MagicMock(),
|
|
)
|
|
|
|
|
|
_install_module_stubs()
|
|
|
|
|
|
class _FakeFile:
|
|
"""Minimal file-like stand-in supporting ``json.load``."""
|
|
|
|
def __init__(self, content: str) -> None:
|
|
self._content = content
|
|
|
|
def read(self, *_args, **_kwargs) -> str:
|
|
return self._content
|
|
|
|
|
|
def _open_returning_payload(payload: dict):
|
|
"""Build a context-manager mock for ``open`` that yields the JSON payload."""
|
|
import json as _json
|
|
|
|
fake_handle = MagicMock()
|
|
fake_handle.__enter__ = MagicMock(return_value=_FakeFile(_json.dumps(payload)))
|
|
fake_handle.__exit__ = MagicMock(return_value=False)
|
|
return MagicMock(return_value=fake_handle)
|
|
|
|
|
|
def _resolve_os_connection_class():
|
|
"""Return the real OSConnection class.
|
|
|
|
``@singleton`` from ``common.decorator`` wraps the class with a closure
|
|
that returns the cached instance on call. ``OSConnection`` at module
|
|
scope is therefore a function, not a type. We unwrap it to recover the
|
|
underlying class so we can call ``__new__`` directly without going through
|
|
``__init__`` (which would attempt a real OpenSearch handshake).
|
|
"""
|
|
from rag.utils import opensearch_conn
|
|
|
|
candidate = opensearch_conn.OSConnection
|
|
if isinstance(candidate, type):
|
|
return candidate
|
|
closure = getattr(candidate, "__closure__", None) or ()
|
|
for cell in closure:
|
|
contents = cell.cell_contents
|
|
if isinstance(contents, type):
|
|
return contents
|
|
raise RuntimeError("Could not locate the OSConnection class in module scope")
|
|
|
|
|
|
def _make_os_connection():
|
|
"""Build an OSConnection without invoking its real network-dependent __init__."""
|
|
cls = _resolve_os_connection_class()
|
|
instance = cls.__new__(cls)
|
|
instance.os = MagicMock()
|
|
instance.info = {"version": {"number": "2.18.0"}}
|
|
instance.mapping = {"settings": {}, "mappings": {}}
|
|
return instance
|
|
|
|
|
|
class TestOSConnectionMetaSurface:
|
|
"""The OSConnection class must expose the dispatch surface
|
|
DocMetadataService relies on."""
|
|
|
|
def test_create_doc_meta_idx_exists(self):
|
|
cls = _resolve_os_connection_class()
|
|
assert callable(getattr(cls, "create_doc_meta_idx", None)), (
|
|
"OSConnection.create_doc_meta_idx is required so the metadata PATCH path does not raise AttributeError on OpenSearch backends (issue #14570)."
|
|
)
|
|
|
|
def test_refresh_idx_exists(self):
|
|
cls = _resolve_os_connection_class()
|
|
assert callable(getattr(cls, "refresh_idx", None))
|
|
|
|
def test_count_idx_exists(self):
|
|
cls = _resolve_os_connection_class()
|
|
assert callable(getattr(cls, "count_idx", None))
|
|
|
|
def test_replace_meta_fields_exists(self):
|
|
cls = _resolve_os_connection_class()
|
|
assert callable(getattr(cls, "replace_meta_fields", None))
|
|
|
|
|
|
class TestCreateDocMetaIdx:
|
|
"""Behavioural tests for OSConnection.create_doc_meta_idx."""
|
|
|
|
def test_returns_true_when_index_already_exists(self):
|
|
conn = _make_os_connection()
|
|
with patch.object(_resolve_os_connection_class(), "index_exist", return_value=True) as exist:
|
|
assert conn.create_doc_meta_idx("ragflow_doc_meta_t1") is True
|
|
exist.assert_called_once_with("ragflow_doc_meta_t1", "")
|
|
|
|
def test_creates_index_with_doc_meta_mapping(self):
|
|
conn = _make_os_connection()
|
|
fake_indices = MagicMock()
|
|
fake_indices.create.return_value = {"acknowledged": True}
|
|
cls = _resolve_os_connection_class()
|
|
|
|
with (
|
|
patch.object(cls, "index_exist", return_value=False),
|
|
patch("rag.utils.opensearch_conn.os.path.exists", return_value=True),
|
|
patch(
|
|
"rag.utils.opensearch_conn.open",
|
|
new=_open_returning_payload(
|
|
{
|
|
"settings": {"index": {"number_of_shards": 2}},
|
|
"mappings": {"properties": {"meta_fields": {"type": "object"}}},
|
|
}
|
|
),
|
|
create=True,
|
|
),
|
|
patch("opensearchpy.client.IndicesClient", return_value=fake_indices),
|
|
):
|
|
result = conn.create_doc_meta_idx("ragflow_doc_meta_t1")
|
|
|
|
assert result == {"acknowledged": True}
|
|
fake_indices.create.assert_called_once()
|
|
kwargs = fake_indices.create.call_args.kwargs
|
|
assert kwargs["index"] == "ragflow_doc_meta_t1"
|
|
body = kwargs["body"]
|
|
assert "settings" in body and "mappings" in body
|
|
assert body["mappings"]["properties"]["meta_fields"]["type"] == "object"
|
|
|
|
def test_returns_false_when_mapping_file_missing(self):
|
|
conn = _make_os_connection()
|
|
cls = _resolve_os_connection_class()
|
|
with patch.object(cls, "index_exist", return_value=False), patch("rag.utils.opensearch_conn.os.path.exists", return_value=False):
|
|
assert conn.create_doc_meta_idx("ragflow_doc_meta_t1") is False
|
|
|
|
def test_returns_false_when_create_call_explodes(self):
|
|
"""If the underlying IndicesClient.create raises, the helper must
|
|
swallow the exception and return False so the service layer can fall
|
|
back gracefully (mirrors ESConnectionBase.create_doc_meta_idx)."""
|
|
conn = _make_os_connection()
|
|
cls = _resolve_os_connection_class()
|
|
fake_indices = MagicMock()
|
|
fake_indices.create.side_effect = RuntimeError("opensearch unreachable")
|
|
|
|
with (
|
|
patch.object(cls, "index_exist", return_value=False),
|
|
patch("rag.utils.opensearch_conn.os.path.exists", return_value=True),
|
|
patch(
|
|
"rag.utils.opensearch_conn.open",
|
|
new=_open_returning_payload({"settings": {}, "mappings": {}}),
|
|
create=True,
|
|
),
|
|
patch("opensearchpy.client.IndicesClient", return_value=fake_indices),
|
|
):
|
|
assert conn.create_doc_meta_idx("ragflow_doc_meta_t1") is False
|
|
|
|
|
|
class TestRefreshIdx:
|
|
def test_calls_indices_refresh(self):
|
|
conn = _make_os_connection()
|
|
assert conn.refresh_idx("ragflow_doc_meta_t1") is True
|
|
conn.os.indices.refresh.assert_called_once_with(index="ragflow_doc_meta_t1")
|
|
|
|
def test_returns_false_on_not_found(self):
|
|
conn = _make_os_connection()
|
|
conn.os.indices.refresh.side_effect = opensearchpy.NotFoundError(404, "index_not_found_exception", {})
|
|
assert conn.refresh_idx("missing_idx") is False
|
|
|
|
def test_swallows_other_errors_and_returns_false(self):
|
|
conn = _make_os_connection()
|
|
conn.os.indices.refresh.side_effect = RuntimeError("transient")
|
|
assert conn.refresh_idx("ragflow_doc_meta_t1") is False
|
|
|
|
|
|
class TestCountIdx:
|
|
def test_returns_count_value(self):
|
|
conn = _make_os_connection()
|
|
conn.os.count.return_value = {"count": 42}
|
|
assert conn.count_idx("ragflow_doc_meta_t1") == 42
|
|
conn.os.count.assert_called_once_with(index="ragflow_doc_meta_t1")
|
|
|
|
def test_missing_index_reads_as_zero(self):
|
|
conn = _make_os_connection()
|
|
conn.os.count.side_effect = opensearchpy.NotFoundError(404, "index_not_found_exception", {})
|
|
assert conn.count_idx("ragflow_doc_meta_t1") == 0
|
|
|
|
def test_other_failure_returns_negative_one(self):
|
|
conn = _make_os_connection()
|
|
conn.os.count.side_effect = RuntimeError("bad")
|
|
assert conn.count_idx("ragflow_doc_meta_t1") == -1
|
|
|
|
|
|
class TestReplaceMetaFields:
|
|
def test_emits_full_assignment_script(self):
|
|
conn = _make_os_connection()
|
|
conn.os.update.return_value = {"_id": "doc-1", "result": "updated"}
|
|
meta = {"author": "alice", "year": 2026}
|
|
|
|
ok = conn.replace_meta_fields("ragflow_doc_meta_t1", "doc-1", meta)
|
|
|
|
assert ok is True
|
|
conn.os.update.assert_called_once()
|
|
kwargs = conn.os.update.call_args.kwargs
|
|
assert kwargs["index"] == "ragflow_doc_meta_t1"
|
|
assert kwargs["id"] == "doc-1"
|
|
assert kwargs["refresh"] is True
|
|
body = kwargs["body"]
|
|
# The script must fully assign meta_fields, otherwise removed keys
|
|
# would persist via deep merge.
|
|
assert body["script"]["source"] == "ctx._source.meta_fields = params.meta_fields"
|
|
assert body["script"]["params"]["meta_fields"] == meta
|
|
|
|
def test_returns_false_when_doc_missing(self):
|
|
conn = _make_os_connection()
|
|
conn.os.update.side_effect = opensearchpy.NotFoundError(404, "document_missing_exception", {})
|
|
assert conn.replace_meta_fields("ragflow_doc_meta_t1", "absent", {"a": 1}) is False
|