2026-04-30 18:13:27 +03:00
#
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
logger = logging . getLogger ( __name__ )
def resolve_reference_metadata_preferences (
request_payload : dict | None = None ,
config_payload : dict | None = None ,
) - > tuple [ bool , set [ str ] | None ] :
"""
Resolve metadata include / fields from request and optional config .
Request values take precedence over config values .
Supports legacy request keys : include_metadata / metadata_fields .
"""
request_payload = request_payload or { }
config_payload = config_payload or { }
config_ref = config_payload . get ( " reference_metadata " , { } )
request_ref = request_payload . get ( " reference_metadata " , { } )
resolved : dict = { }
if isinstance ( config_ref , dict ) :
resolved . update ( config_ref )
if isinstance ( request_ref , dict ) :
resolved . update ( request_ref )
if " include_metadata " in request_payload :
resolved [ " include " ] = bool ( request_payload . get ( " include_metadata " ) )
if " metadata_fields " in request_payload :
resolved [ " fields " ] = request_payload . get ( " metadata_fields " )
include_metadata = bool ( resolved . get ( " include " , False ) )
fields = resolved . get ( " fields " )
if fields is None :
return include_metadata , None
if not isinstance ( fields , list ) :
logger . warning (
2026-07-03 12:53:39 +08:00
" reference_metadata.fields is not a list; include_metadata= %s fields= %r type= %s resolved= %r . enrich_chunks_with_document_metadata will skip enrichment. " ,
2026-04-30 18:13:27 +03:00
include_metadata ,
fields ,
type ( fields ) . __name__ ,
resolved ,
)
return include_metadata , set ( )
return include_metadata , { f for f in fields if isinstance ( f , str ) }
def enrich_chunks_with_document_metadata (
chunks : list [ dict ] ,
metadata_fields : set [ str ] | None = None ,
* ,
kb_field : str = " kb_id " ,
doc_field : str = " doc_id " ,
output_field : str = " document_metadata " ,
) - > None :
"""
Mutates chunk payloads in - place by attaching ` document_metadata ` .
Field names can be customized for different chunk schemas .
"""
if metadata_fields is not None and not metadata_fields :
return
doc_ids_by_kb : dict [ str , set [ str ] ] = { }
for chunk in chunks :
kb_ids = chunk . get ( kb_field )
doc_id = chunk . get ( doc_field )
if not kb_ids or not doc_id :
continue
if isinstance ( kb_ids , ( list , tuple ) ) :
for kid in kb_ids :
if kid :
doc_ids_by_kb . setdefault ( kid , set ( ) ) . add ( doc_id )
else :
doc_ids_by_kb . setdefault ( kb_ids , set ( ) ) . add ( doc_id )
if not doc_ids_by_kb :
return
# Resolve service lazily so callers/tests that swap service modules at runtime
# (e.g. via monkeypatch) don't get stuck with a stale class reference.
from api . db . services . doc_metadata_service import DocMetadataService
2026-07-03 12:53:39 +08:00
2026-04-30 18:13:27 +03:00
metadata_getter = getattr ( DocMetadataService , " get_metadata_for_documents " , None )
if not callable ( metadata_getter ) :
2026-07-03 12:53:39 +08:00
logging . warning ( " DocMetadataService.get_metadata_for_documents is unavailable; skipping metadata enrichment. " )
2026-04-30 18:13:27 +03:00
return
meta_by_doc : dict [ str , dict ] = { }
for kb_id , doc_ids in doc_ids_by_kb . items ( ) :
meta_map = metadata_getter ( list ( doc_ids ) , kb_id )
if meta_map :
meta_by_doc . update ( meta_map )
logging . debug ( " Fetched metadata for %d docs in kb_id= %s " , len ( meta_map ) , kb_id )
for chunk in chunks :
doc_id = chunk . get ( doc_field )
if not doc_id :
continue
meta = meta_by_doc . get ( doc_id )
if not meta :
continue
if metadata_fields is not None :
meta = { k : v for k , v in meta . items ( ) if k in metadata_fields }
if meta :
chunk [ output_field ] = meta
logging . debug ( " Enriched chunk for doc_id= %s with %d metadata fields: %s " , doc_id , len ( meta ) , list ( meta . keys ( ) ) )