fix(opensearch): implement doc-meta dispatch surface on OSConnection (#14577)

### What problem does this PR solve?

Fixes #14570. On OpenSearch backends (`DOC_ENGINE=opensearch`) every
document-metadata write failed with `'OSConnection' object has no
attribute 'create_doc_meta_idx'`, so both `PATCH
/api/v1/datasets/{ds}/documents/{doc}` with `meta_fields` and `POST
/api/v1/datasets/{ds}/metadata/update` were unusable while every other
document operation (retrieval, parsing, name update, chunk management)
worked correctly on the same OpenSearch cluster.

The bug runs deeper than the missing method name in the error message
suggests. `DocMetadataService` also reached into
`settings.docStoreConn.es.*` directly for the index refresh, the
scripted partial update, and the count call, which means that even after
adding `create_doc_meta_idx` to `OSConnection` the very next call in the
same metadata flow would still raise `AttributeError` because
`OSConnection` exposes `self.os` rather than `self.es`. Fixing only the
reported symptom would have moved the failure one line down without
restoring the feature.

This PR adds a uniform document-metadata dispatch surface to both
connection classes so they present the same abstract API, and routes the
service layer through that surface via `getattr` guards instead of
poking at backend-specific attributes. The four new methods on
`OSConnection` and `ESConnectionBase` are `create_doc_meta_idx`,
`refresh_idx`, `count_idx`, and `replace_meta_fields`.
`OSConnection.create_doc_meta_idx` reuses the existing
`conf/doc_meta_es_mapping.json` schema in the OpenSearch `body=` form
because OpenSearch and Elasticsearch share the same index-creation
payload, and `replace_meta_fields` emits a full scripted assignment
(`ctx._source.meta_fields = params.meta_fields`) on both backends so
removed keys actually disappear instead of being preserved by deep-merge
semantics.

The `getattr`-guarded dispatch in `DocMetadataService` keeps the
existing fall-through paths intact for Infinity and OceanBase, which
continue to rely on their search-based count fallback and on the
delete-then-insert metadata replacement they used before, so this change
is strictly additive for those two backends.

Verification: `pytest
test/unit_test/rag/utils/test_opensearch_doc_meta.py` runs 16 new unit
tests that pass locally and pin the `OSConnection` dispatch surface, the
`create_doc_meta_idx` short-circuit when the index already exists, the
mapping-file payload routing, the `IndicesClient.create` failure path,
the `refresh_idx` and `count_idx` success and error sentinels, and the
full-assignment script emitted by `replace_meta_fields`. The test module
stubs `common.settings` and `rag.nlp` at import time so the suite runs
without the heavy backend SDKs that the rest of the repository pulls in
transitively.


### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

---------

Co-authored-by: tmimmanuel <tmimmanuel@users.noreply.github.com>
This commit is contained in:
tmimmanuel
2026-05-10 23:04:28 -10:00
committed by GitHub
parent 292b0b8bce
commit 663fc1d42c
4 changed files with 481 additions and 28 deletions

View File

@@ -159,6 +159,61 @@ class ESConnectionBase(DocStoreConnection):
except Exception as e:
self.logger.exception(f"Error creating document metadata index {index_name}: {e}")
def refresh_idx(self, index_name: str) -> bool:
"""
Refresh an index so that recently inserted documents become searchable.
Service layers should call this dispatch method instead of reaching
into ``self.es`` directly, so the OpenSearch and Elasticsearch
connections present a uniform abstract API.
"""
try:
self.es.indices.refresh(index=index_name)
return True
except NotFoundError:
return False
except Exception as e:
self.logger.warning(f"ESConnection.refresh_idx({index_name}) failed: {e}")
return False
def count_idx(self, index_name: str) -> int:
"""
Return the document count for an index, or -1 if the call fails.
Used to decide whether a per-tenant metadata index is empty without
paying a full search.
"""
try:
response = self.es.count(index=index_name)
return int(response.get("count", 0))
except NotFoundError:
return 0
except Exception as e:
self.logger.warning(f"ESConnection.count_idx({index_name}) failed: {e}")
return -1
def replace_meta_fields(self, index_name: str, doc_id: str, meta_fields: dict) -> bool:
"""
Fully replace the ``meta_fields`` object on a single document.
Using ES.update with a ``doc`` body would deep-merge object fields,
retaining old keys that should be removed. A scripted update assigns
the new meta_fields outright, matching delete-key semantics.
"""
body = {
"script": {
"source": "ctx._source.meta_fields = params.meta_fields",
"params": {"meta_fields": meta_fields},
}
}
try:
self.es.update(index=index_name, id=doc_id, refresh=True, body=body)
return True
except NotFoundError:
return False
except Exception as e:
self.logger.warning(f"ESConnection.replace_meta_fields({index_name}, {doc_id}) failed: {e}")
return False
def delete_idx(self, index_name: str, dataset_id: str):
if len(dataset_id) > 0:
# The index need to be alive after any kb deletion since all kb under this tenant are in one index.