mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
Feature/table parser column roles (#13710)
### What problem does this PR solve? The table file parser (CSV/Excel) currently treats all columns identically — every column is both vectorized (embedded in chunk text) and stored as filterable metadata. There's no way for users to control which columns should be searchable by semantic meaning versus which should only be filterable attributes. For example, when ingesting a news articles CSV with columns like title, content, country, category, source, etc., the embedding includes metadata fields like country: Brazil and source: Reuters in the chunk text, which dilutes the semantic quality of the embedding without adding retrieval value. The RDBMS connector (MySQL/PostgreSQL) already supports content_columns / metadata_columns, but this capability was missing for file-based table ingestion. This PR adds column-level control (vectorize / metadata / both) for the table file parser, following RAGFlow's existing patterns. Backward compatible: Datasets without table_column_roles or with table_column_mode: auto behave exactly as before (all columns = both). ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@@ -377,6 +377,9 @@ class AutoMetadataConfig(Base):
|
||||
built_in_metadata: Annotated[list[AutoMetadataField], Field(default_factory=list)]
|
||||
|
||||
|
||||
TableColumnRole = Literal["indexing", "metadata", "both"]
|
||||
|
||||
|
||||
class ParserConfig(Base):
|
||||
auto_keywords: Annotated[int, Field(default=0, ge=0, le=32)]
|
||||
auto_questions: Annotated[int, Field(default=0, ge=0, le=10)]
|
||||
@@ -393,6 +396,25 @@ class ParserConfig(Base):
|
||||
task_page_size: Annotated[int | None, Field(default=None, ge=1)]
|
||||
pages: Annotated[list[list[int]] | None, Field(default=None)]
|
||||
ext: Annotated[dict, Field(default={})]
|
||||
# Table parser: column name -> "indexing" | "metadata" | "both". Absence => all columns "both".
|
||||
# Table parser: "auto" = all columns both (default), "manual" = use table_column_roles. None → treated as "auto".
|
||||
table_column_mode: Annotated[Literal["auto", "manual"] | None, Field(default=None)]
|
||||
# Table parser: column name -> "indexing" | "metadata" | "both". Used only when table_column_mode == "manual".
|
||||
table_column_roles: Annotated[dict[str, TableColumnRole] | None, Field(default=None)]
|
||||
# Table parser: list of column names (set by backend after first parse; used by frontend for role selector).
|
||||
table_column_names: Annotated[list[str] | None, Field(default=None)]
|
||||
|
||||
@field_validator("table_column_roles", mode="before")
|
||||
@classmethod
|
||||
def legacy_vectorize_table_column_role(cls, v: Any) -> Any:
|
||||
"""Normalize legacy role value *vectorize* to *indexing* (chunk text + full-text search)."""
|
||||
if v is None or not isinstance(v, dict):
|
||||
return v
|
||||
out: dict[str, Any] = {}
|
||||
for key, val in v.items():
|
||||
k = key if isinstance(key, str) else str(key)
|
||||
out[k] = "indexing" if val == "vectorize" else val
|
||||
return out
|
||||
|
||||
|
||||
class UpdateDocumentReq(Base):
|
||||
|
||||
139
rag/app/table.py
139
rag/app/table.py
@@ -36,6 +36,7 @@ from rag.nlp import rag_tokenizer, tokenize, tokenize_table
|
||||
from deepdoc.parser import ExcelParser
|
||||
from common import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class Excel(ExcelParser):
|
||||
def __call__(self, fnm, binary=None, from_page=0, to_page=MAXIMUM_TASK_PAGE_NUMBER, callback=None, **kwargs):
|
||||
@@ -372,6 +373,11 @@ def chunk(filename, binary=None, from_page=0, to_page=MAXIMUM_TASK_PAGE_NUMBER,
|
||||
|
||||
Every row in table will be treated as a chunk.
|
||||
"""
|
||||
_pc0 = kwargs.get("parser_config") or {}
|
||||
logger.debug(f"[TABLE_PARSER_DEBUG] parser_config keys: {list(_pc0.keys())}")
|
||||
logger.debug(f"[TABLE_PARSER_DEBUG] table_column_mode: {_pc0.get('table_column_mode')}")
|
||||
logger.debug(f"[TABLE_PARSER_DEBUG] table_column_roles: {_pc0.get('table_column_roles')}")
|
||||
|
||||
tbls = []
|
||||
is_english = lang.lower() == "english"
|
||||
if re.search(r"\.xlsx?$", filename, re.IGNORECASE):
|
||||
@@ -435,6 +441,19 @@ def chunk(filename, binary=None, from_page=0, to_page=MAXIMUM_TASK_PAGE_NUMBER,
|
||||
# Field type suffixes for database columns
|
||||
# Maps data types to their database field suffixes
|
||||
fields_map = {"text": "_tks", "int": "_long", "keyword": "_kwd", "float": "_flt", "datetime": "_dt", "bool": "_kwd"}
|
||||
parser_config = kwargs.get("parser_config") or {}
|
||||
if parser_config.get("table_column_mode") == "manual":
|
||||
column_roles = parser_config.get("table_column_roles") or {}
|
||||
else:
|
||||
column_roles = {}
|
||||
logger.debug(
|
||||
f"[TABLE_PARSER_DEBUG] effective table_column_mode={parser_config.get('table_column_mode')!r}, "
|
||||
f"column_roles keys={list(column_roles.keys())}"
|
||||
)
|
||||
|
||||
# Pass 1: infer columns per sheet (multi-sheet Excel => multiple DataFrames). Merge field_map and
|
||||
# table_column_names, then update KB once so the UI role selector sees all columns, not only the last sheet.
|
||||
sheet_specs = []
|
||||
for df in dfs:
|
||||
for n in ["id", "_id", "index", "idx"]:
|
||||
if n in df.columns:
|
||||
@@ -457,22 +476,64 @@ def chunk(filename, binary=None, from_page=0, to_page=MAXIMUM_TASK_PAGE_NUMBER,
|
||||
txts.extend([str(c) for c in cln if c])
|
||||
clmns_map = [(py_clmns[i].lower() + fields_map[clmn_tys[i]], str(clmns[i]).replace("_", " ")) for i in
|
||||
range(len(clmns))]
|
||||
# For Infinity/OceanBase: Use original column names as keys since they're stored in chunk_data JSON
|
||||
# For ES/OS: Use full field names with type suffixes (e.g., url_kwd, body_tks)
|
||||
# field_map: only columns stored in chunk_data (metadata or both) — used for retrieval/SQL
|
||||
stored_indices = [
|
||||
i for i in range(len(clmns))
|
||||
if column_roles.get(clmns[i], "both") in ("metadata", "both")
|
||||
]
|
||||
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
|
||||
# For Infinity/OceanBase: key = original column name, value = display name
|
||||
field_map = {py_clmns[i].lower(): str(clmns[i]).replace("_", " ") for i in range(len(clmns))}
|
||||
field_map = {
|
||||
py_clmns[i].lower(): str(clmns[i]).replace("_", " ")
|
||||
for i in stored_indices
|
||||
}
|
||||
else:
|
||||
# For ES/OS: key = typed field name, value = display name
|
||||
field_map = {k: v for k, v in clmns_map}
|
||||
logging.debug(f"Field map: {field_map}")
|
||||
KnowledgebaseService.update_parser_config(kwargs["kb_id"], {"field_map": field_map})
|
||||
field_map = {
|
||||
clmns_map[i][0]: clmns_map[i][1]
|
||||
for i in stored_indices
|
||||
}
|
||||
logging.debug(f"Field map (sheet): {field_map}")
|
||||
sheet_specs.append(
|
||||
{
|
||||
"df": df,
|
||||
"clmns": clmns,
|
||||
"clmn_tys": clmn_tys,
|
||||
"clmns_map": clmns_map,
|
||||
"py_clmns": py_clmns,
|
||||
"field_map": field_map,
|
||||
}
|
||||
)
|
||||
|
||||
eng = lang.lower() == "english" # is_english(txts)
|
||||
merged_field_map = {}
|
||||
merged_table_column_names = []
|
||||
seen_col = set()
|
||||
for spec in sheet_specs:
|
||||
merged_field_map.update(spec["field_map"])
|
||||
for col in spec["clmns"]:
|
||||
if col not in seen_col:
|
||||
seen_col.add(col)
|
||||
merged_table_column_names.append(col)
|
||||
|
||||
logging.debug(f"Field map (merged across sheets): {merged_field_map}")
|
||||
kb_id = kwargs.get("kb_id")
|
||||
if kb_id:
|
||||
KnowledgebaseService.update_parser_config(
|
||||
kb_id,
|
||||
{"field_map": merged_field_map, "table_column_names": merged_table_column_names},
|
||||
)
|
||||
|
||||
eng = lang.lower() == "english" # is_english(txts)
|
||||
for spec in sheet_specs:
|
||||
df = spec["df"]
|
||||
clmns = spec["clmns"]
|
||||
clmn_tys = spec["clmn_tys"]
|
||||
clmns_map = spec["clmns_map"]
|
||||
py_clmns = spec["py_clmns"]
|
||||
_debug_row_idx = 0
|
||||
for ii, row in df.iterrows():
|
||||
_debug_row_idx += 1
|
||||
d = {"docnm_kwd": filename, "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename))}
|
||||
row_fields = []
|
||||
data_json = {} # For Infinity: Store all columns in a JSON object
|
||||
text_fields = [] # indexing + both -> content_with_weight
|
||||
stored = {} # metadata + both -> chunk_data (Infinity) or typed fields (ES)
|
||||
for j in range(len(clmns)):
|
||||
if row[clmns[j]] is None:
|
||||
continue
|
||||
@@ -480,27 +541,49 @@ def chunk(filename, binary=None, from_page=0, to_page=MAXIMUM_TASK_PAGE_NUMBER,
|
||||
continue
|
||||
if not isinstance(row[clmns[j]], pd.Series) and pd.isna(row[clmns[j]]):
|
||||
continue
|
||||
# For Infinity/OceanBase: Store in chunk_data JSON column
|
||||
# For Elasticsearch/OpenSearch: Store as individual fields with type suffixes
|
||||
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
|
||||
data_json[str(clmns[j])] = row[clmns[j]]
|
||||
else:
|
||||
fld = clmns_map[j][0]
|
||||
d[fld] = row[clmns[j]] if clmn_tys[j] != "text" else rag_tokenizer.tokenize(row[clmns[j]])
|
||||
row_fields.append((clmns[j], row[clmns[j]]))
|
||||
if not row_fields:
|
||||
col_name = clmns[j]
|
||||
role = column_roles.get(col_name, "both")
|
||||
if _debug_row_idx == 1:
|
||||
logger.debug(f"[TABLE_PARSER_DEBUG] Column '{col_name}' -> role '{role}'")
|
||||
if role in ("indexing", "vectorize", "both"):
|
||||
text_fields.append((col_name, row[col_name]))
|
||||
if role in ("metadata", "both"):
|
||||
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
|
||||
stored[str(col_name)] = row[col_name]
|
||||
else:
|
||||
fld = clmns_map[j][0]
|
||||
if clmn_tys[j] != "text":
|
||||
stored[fld] = row[col_name]
|
||||
else:
|
||||
cell = row[col_name]
|
||||
stored[fld] = rag_tokenizer.tokenize(cell)
|
||||
raw_s = str(cell).strip() if cell is not None else ""
|
||||
if raw_s:
|
||||
stored[f"{py_clmns[j].lower()}_raw"] = raw_s
|
||||
if not text_fields and not stored:
|
||||
continue
|
||||
# Add the data JSON field to the document (for Infinity/OceanBase)
|
||||
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
|
||||
d["chunk_data"] = data_json
|
||||
# Format as a structured text for better LLM comprehension
|
||||
# Format each field as "- Field Name: Value" on separate lines
|
||||
formatted_text = "\n".join([f"- {field}: {value}" for field, value in row_fields])
|
||||
if stored:
|
||||
d["chunk_data"] = stored
|
||||
else:
|
||||
d.update(stored)
|
||||
formatted_text = "\n".join([f"- {field}: {value}" for field, value in text_fields]) if text_fields else ""
|
||||
tokenize(d, formatted_text, eng)
|
||||
if _debug_row_idx == 1:
|
||||
logger.debug(
|
||||
f"[TABLE_PARSER_DEBUG] Chunk content_with_weight length: {len(d.get('content_with_weight', '') or '')}"
|
||||
)
|
||||
_cd = d.get("chunk_data")
|
||||
logger.debug(
|
||||
f"[TABLE_PARSER_DEBUG] Chunk chunk_data keys: {list(_cd.keys()) if isinstance(_cd, dict) else 'N/A'}"
|
||||
)
|
||||
if not (settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE):
|
||||
_extra = [k for k in d if k not in ("docnm_kwd", "title_tks", "content_with_weight", "content_ltks", "content_sm_ltks")]
|
||||
logger.debug(f"[TABLE_PARSER_DEBUG] Chunk ES extra field keys (sample): {_extra[:20]}")
|
||||
res.append(d)
|
||||
if tbls:
|
||||
doc = {"docnm_kwd": filename, "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename))}
|
||||
res.extend(tokenize_table(tbls, doc, is_english))
|
||||
if tbls:
|
||||
doc = {"docnm_kwd": filename, "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename))}
|
||||
res.extend(tokenize_table(tbls, doc, is_english))
|
||||
callback(0.35, "")
|
||||
|
||||
return res
|
||||
|
||||
@@ -79,9 +79,15 @@ from common.signal_utils import start_tracemalloc_and_snapshot, stop_tracemalloc
|
||||
from common.exceptions import TaskCanceledException
|
||||
from common import settings
|
||||
from common.constants import PAGERANK_FLD, TAG_FLD, SVR_CONSUMER_GROUP_NAME
|
||||
from rag.utils.table_es_metadata import (
|
||||
aggregate_table_manual_doc_metadata,
|
||||
merge_table_parser_config_from_kb,
|
||||
table_parser_strip_doc_metadata_keys,
|
||||
)
|
||||
|
||||
BATCH_SIZE = 64
|
||||
|
||||
|
||||
FACTORY = {
|
||||
"general": naive,
|
||||
ParserType.NAIVE.value: naive,
|
||||
@@ -268,6 +274,16 @@ async def build_chunks(task, progress_callback):
|
||||
logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"]))
|
||||
raise
|
||||
|
||||
# Table parser column roles / mode are stored on the dataset (KB) parser_config;
|
||||
# chunk tasks carry document-level parser_config only — merge KB keys so manual roles apply.
|
||||
parser_config_for_chunk = merge_table_parser_config_from_kb(task)
|
||||
if task.get("parser_id", "").lower() == "table" and task.get("kb_parser_config"):
|
||||
logging.debug(
|
||||
"[TASK_EXECUTOR_DEBUG] table parser: merged KB keys into parser_config for chunk; "
|
||||
f"mode={parser_config_for_chunk.get('table_column_mode')}, "
|
||||
f"roles_keys={list((parser_config_for_chunk.get('table_column_roles') or {}).keys())}"
|
||||
)
|
||||
|
||||
try:
|
||||
async with chunk_limiter:
|
||||
cks = await thread_pool_exec(
|
||||
@@ -279,7 +295,7 @@ async def build_chunks(task, progress_callback):
|
||||
lang=task["language"],
|
||||
callback=progress_callback,
|
||||
kb_id=task["kb_id"],
|
||||
parser_config=task["parser_config"],
|
||||
parser_config=parser_config_for_chunk,
|
||||
tenant_id=task["tenant_id"],
|
||||
)
|
||||
logging.info("Chunking({}) {}/{} done".format(timer() - st, task["location"], task["name"]))
|
||||
@@ -1262,6 +1278,43 @@ async def do_handle_task(task):
|
||||
|
||||
DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0)
|
||||
|
||||
# Table parser (manual): push metadata/both column values to document-level metadata for UI / chat filters
|
||||
if task.get("parser_id", "").lower() == "table":
|
||||
eff_pc = merge_table_parser_config_from_kb(task)
|
||||
logging.debug(
|
||||
f"[TABLE_META_DEBUG] table post-index: table_column_mode={eff_pc.get('table_column_mode')!r}"
|
||||
)
|
||||
if eff_pc.get("table_column_mode") == "manual":
|
||||
try:
|
||||
agg = aggregate_table_manual_doc_metadata(chunks, task)
|
||||
logging.debug(f"[TABLE_META_DEBUG] aggregated metadata: {agg}")
|
||||
strip_keys = table_parser_strip_doc_metadata_keys(eff_pc)
|
||||
existing = DocMetadataService.get_document_metadata(task_doc_id)
|
||||
existing = existing if isinstance(existing, dict) else {}
|
||||
preserved = {k: v for k, v in existing.items() if k not in strip_keys}
|
||||
merged = update_metadata_to(dict(preserved), agg)
|
||||
logging.debug(
|
||||
f"[TABLE_META_DEBUG] calling update_document_metadata for doc_id={task_doc_id}, "
|
||||
f"meta_fields keys={list(merged.keys())}, "
|
||||
f"table_strip_key_count={len(strip_keys)}, agg_keys={list(agg.keys())}"
|
||||
)
|
||||
try:
|
||||
DocMetadataService.update_document_metadata(task_doc_id, merged)
|
||||
logging.debug("[TABLE_META_DEBUG] update_document_metadata succeeded")
|
||||
except Exception as ue:
|
||||
logging.error(
|
||||
"update_document_metadata failed (table parser, doc_id=%s): %s",
|
||||
task_doc_id,
|
||||
ue,
|
||||
exc_info=True,
|
||||
)
|
||||
except Exception as e:
|
||||
logging.exception(
|
||||
"Table parser document metadata aggregation failed (doc_id=%s): %s",
|
||||
task_doc_id,
|
||||
e,
|
||||
)
|
||||
|
||||
progress_callback(msg="Indexing done ({:.2f}s).".format(timer() - start_ts))
|
||||
|
||||
if toc_thread:
|
||||
|
||||
296
rag/utils/table_es_metadata.py
Normal file
296
rag/utils/table_es_metadata.py
Normal file
@@ -0,0 +1,296 @@
|
||||
#
|
||||
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
"""Table manual-mode ES field resolution and document metadata aggregation (lightweight; used by task_executor)."""
|
||||
|
||||
import logging
|
||||
|
||||
from common import settings
|
||||
from common.metadata_utils import dedupe_list
|
||||
|
||||
|
||||
def _knowledgebase_service_cls():
|
||||
"""Lazy import for KnowledgebaseService (used by aggregate; mockable in unit tests)."""
|
||||
from api.db.services.knowledgebase_service import KnowledgebaseService
|
||||
|
||||
return KnowledgebaseService
|
||||
|
||||
|
||||
def merge_table_parser_config_from_kb(task: dict) -> dict:
|
||||
"""Merge dataset-level table parser keys into document parser_config (see build_chunks)."""
|
||||
pc = task.get("parser_config") or {}
|
||||
if task.get("parser_id", "").lower() != "table" or not task.get("kb_parser_config"):
|
||||
return pc
|
||||
out = dict(pc)
|
||||
kb_pc = task["kb_parser_config"]
|
||||
for _k in ("table_column_mode", "table_column_roles", "table_column_names"):
|
||||
if _k in kb_pc:
|
||||
out[_k] = kb_pc[_k]
|
||||
return out
|
||||
|
||||
|
||||
def table_parser_strip_doc_metadata_keys(eff_parser_config: dict) -> frozenset[str]:
|
||||
"""
|
||||
Table manual mode stores per-column values under document metadata keys equal to the
|
||||
CSV column name. On reparse, strip these keys from existing metadata before merging
|
||||
a fresh aggregate so columns switched to indexing-only (or removed) do not persist.
|
||||
"""
|
||||
names = eff_parser_config.get("table_column_names")
|
||||
if names:
|
||||
return frozenset(str(n).strip() for n in names if n is not None and str(n).strip())
|
||||
roles = eff_parser_config.get("table_column_roles") or {}
|
||||
return frozenset(str(k).strip() for k in roles if k is not None and str(k).strip())
|
||||
|
||||
|
||||
def _field_map_typed_key_for_column(field_map: dict, col: str) -> str | None:
|
||||
"""Map CSV column name to ES typed field key (field_map: typed_key -> display name)."""
|
||||
if not field_map or not col:
|
||||
return None
|
||||
col_s = str(col).strip()
|
||||
col_norm = col_s.replace("_", " ").strip().lower()
|
||||
for tk, disp in field_map.items():
|
||||
disp_s = str(disp).strip()
|
||||
if disp_s.lower() == col_norm or disp_s.lower() == col_s.lower():
|
||||
return tk
|
||||
return None
|
||||
|
||||
|
||||
def _probe_es_typed_key_for_column(col: str, sample_chunk: dict) -> str | None:
|
||||
"""
|
||||
When field_map is missing/stale, try to infer the ES field key present on a chunk.
|
||||
Table chunks use normalized/pinyin keys of the form <normalized_base><suffix>, where suffix is
|
||||
one of: _raw, _tks, _dt, _long, _flt, _kwd (see rag/app/table.py).
|
||||
"""
|
||||
if not col or not isinstance(sample_chunk, dict):
|
||||
return None
|
||||
base_raw = str(col).strip()
|
||||
if not base_raw:
|
||||
return None
|
||||
base_norm = base_raw.replace("_", " ").strip().lower().replace(" ", "")
|
||||
suffixes = ("_tks", "_raw", "_dt", "_long", "_flt", "_kwd")
|
||||
for key in sample_chunk.keys():
|
||||
key_s = str(key)
|
||||
if not key_s:
|
||||
continue
|
||||
key_norm = key_s.strip().lower()
|
||||
if key_norm == base_raw.lower() or key_norm.replace("_", "").replace(" ", "") == base_norm:
|
||||
return key_s
|
||||
for key in sample_chunk.keys():
|
||||
key_s = str(key)
|
||||
if not key_s:
|
||||
continue
|
||||
key_lower = key_s.lower()
|
||||
for sfx in suffixes:
|
||||
if key_lower.endswith(sfx):
|
||||
core = key_lower[: -len(sfx)]
|
||||
core_norm = core.replace("_", "").replace(" ", "")
|
||||
if core_norm == base_norm:
|
||||
return key_s
|
||||
return None
|
||||
|
||||
|
||||
def _resolve_es_chunk_field_key(
|
||||
col: str, field_map: dict, sample_chunk: dict | None
|
||||
) -> tuple[str | None, str]:
|
||||
"""Prefer field_map when key exists on chunk; else probe by suffix (matches table.py naming)."""
|
||||
tk_fm = _field_map_typed_key_for_column(field_map, col) if field_map else None
|
||||
if sample_chunk:
|
||||
if tk_fm and tk_fm in sample_chunk:
|
||||
return tk_fm, "field_map"
|
||||
probed = _probe_es_typed_key_for_column(col, sample_chunk)
|
||||
if probed:
|
||||
return probed, "probe" if not tk_fm else "probe_field_map_mismatch"
|
||||
if tk_fm:
|
||||
return tk_fm, "field_map_absent_on_chunk"
|
||||
if tk_fm:
|
||||
return tk_fm, "field_map"
|
||||
return None, "none"
|
||||
|
||||
|
||||
def _value_to_meta_string(val) -> str | None:
|
||||
"""Normalize chunk field values for DocMetadataService (strings / list of strings only)."""
|
||||
if val is None:
|
||||
return None
|
||||
if isinstance(val, bool):
|
||||
return str(val).lower()
|
||||
if isinstance(val, (int, float)):
|
||||
return str(val)
|
||||
if isinstance(val, str):
|
||||
s = val.strip()
|
||||
return s if s else None
|
||||
return str(val)
|
||||
|
||||
|
||||
def _es_raw_field_key_from_typed(tk: str | None) -> str | None:
|
||||
"""ES text columns use *_tks (tokenized); raw display value is stored as {same_base}_raw (see rag/app/table.py)."""
|
||||
if not tk or not tk.endswith("_tks"):
|
||||
return None
|
||||
return tk[: -len("_tks")] + "_raw"
|
||||
|
||||
|
||||
def _es_field_value_to_doc_metadata(val, *, from_tks_fallback: bool) -> str | None:
|
||||
"""Prefer raw strings; for legacy *_tks tokenized fields, normalize list/str to a single display string."""
|
||||
if val is None:
|
||||
return None
|
||||
if from_tks_fallback and isinstance(val, list):
|
||||
parts = [str(x).strip() for x in val if x is not None and str(x).strip()]
|
||||
if not parts:
|
||||
return None
|
||||
return " ".join(parts)
|
||||
return _value_to_meta_string(val)
|
||||
|
||||
|
||||
def aggregate_table_manual_doc_metadata(chunks: list, task: dict) -> dict:
|
||||
"""
|
||||
Collect unique values per metadata/both column across chunks for document-level metadata.
|
||||
Used when table_column_mode == manual (parallel to LLM gen_metadata, no schema required).
|
||||
"""
|
||||
logging.debug(
|
||||
f"[TABLE_META_DEBUG] aggregate_table_manual_doc_metadata called with {len(chunks)} chunks"
|
||||
)
|
||||
eff = merge_table_parser_config_from_kb(task)
|
||||
if eff.get("table_column_mode") != "manual":
|
||||
logging.debug(
|
||||
f"[TABLE_META_DEBUG] skip aggregate: table_column_mode={eff.get('table_column_mode')!r}"
|
||||
)
|
||||
return {}
|
||||
roles = eff.get("table_column_roles") or {}
|
||||
table_column_names = eff.get("table_column_names") or []
|
||||
if table_column_names:
|
||||
meta_cols = [
|
||||
col
|
||||
for col in table_column_names
|
||||
if roles.get(col, "both") in ("metadata", "both")
|
||||
]
|
||||
else:
|
||||
meta_cols = [c for c, r in roles.items() if r in ("metadata", "both")]
|
||||
if not meta_cols:
|
||||
logging.debug(
|
||||
"[TABLE_META_DEBUG] skip aggregate: no metadata/both columns "
|
||||
f"(table_column_names_present={bool(table_column_names)})"
|
||||
)
|
||||
return {}
|
||||
fm = (task.get("kb_parser_config") or {}).get("field_map") or {}
|
||||
kb_id = task.get("kb_id")
|
||||
if not fm and kb_id:
|
||||
try:
|
||||
KBS = _knowledgebase_service_cls()
|
||||
ok, kb = KBS.get_by_id(kb_id)
|
||||
if ok and kb:
|
||||
fresh_pc = kb.parser_config or {}
|
||||
reloaded = fresh_pc.get("field_map") or {}
|
||||
if reloaded:
|
||||
fm = reloaded
|
||||
logging.debug(
|
||||
f"[TABLE_META_DEBUG] reloaded field_map from DB: {len(fm)} entries"
|
||||
)
|
||||
else:
|
||||
logging.debug(
|
||||
"[TABLE_META_DEBUG] KB reload: parser_config has no field_map yet; "
|
||||
"will use ES key probe on chunk dicts if applicable"
|
||||
)
|
||||
except Exception as e:
|
||||
logging.debug(
|
||||
"[TABLE_META_DEBUG] failed to reload field_map from DB: %s",
|
||||
e,
|
||||
exc_info=True,
|
||||
)
|
||||
if not fm and not (settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE):
|
||||
logging.debug(
|
||||
"[TABLE_META_DEBUG] field_map empty on task snapshot — will use ES key probe on chunk dicts; "
|
||||
f"kb_parser_config keys={list((task.get('kb_parser_config') or {}).keys())}"
|
||||
)
|
||||
logging.debug(
|
||||
f"[TABLE_META_DEBUG] meta_cols={meta_cols}, field_map entries={len(fm)}, "
|
||||
f"infinity={settings.DOC_ENGINE_INFINITY}, oceanbase={settings.DOC_ENGINE_OCEANBASE}"
|
||||
)
|
||||
sample_ck = next((c for c in chunks if isinstance(c, dict)), None)
|
||||
if sample_ck:
|
||||
sk = [
|
||||
k
|
||||
for k in sample_ck.keys()
|
||||
if not (str(k).startswith("q_") and str(k).endswith("_vec"))
|
||||
][:50]
|
||||
logging.debug(f"[TABLE_META_DEBUG] first chunk non-vector keys (sample): {sk}")
|
||||
|
||||
es_col_keys: dict[str, tuple[str | None, str]] = {}
|
||||
if not (settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE):
|
||||
for col in meta_cols:
|
||||
tk, src = _resolve_es_chunk_field_key(col, fm, sample_ck)
|
||||
es_col_keys[col] = (tk, src)
|
||||
logging.debug(
|
||||
f"[TABLE_META_DEBUG] column '{col}' -> ES key {tk!r} (source={src})"
|
||||
)
|
||||
|
||||
acc: dict[str, list] = {c: [] for c in meta_cols}
|
||||
|
||||
for i, ck in enumerate(chunks):
|
||||
if not isinstance(ck, dict):
|
||||
continue
|
||||
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
|
||||
cd = ck.get("chunk_data")
|
||||
if not isinstance(cd, dict):
|
||||
continue
|
||||
for col in meta_cols:
|
||||
if col not in cd:
|
||||
continue
|
||||
s = _value_to_meta_string(cd[col])
|
||||
if s is not None:
|
||||
acc[col].append(s)
|
||||
else:
|
||||
for col in meta_cols:
|
||||
tk, _src = es_col_keys.get(col, (None, "none"))
|
||||
if not tk:
|
||||
if i == 0:
|
||||
logging.debug(
|
||||
f"[TABLE_META_DEBUG] no resolved ES key for column '{col}'"
|
||||
)
|
||||
continue
|
||||
raw_k = _es_raw_field_key_from_typed(tk)
|
||||
val = None
|
||||
from_tks = False
|
||||
if raw_k and raw_k in ck:
|
||||
val = ck[raw_k]
|
||||
elif tk in ck:
|
||||
val = ck[tk]
|
||||
from_tks = tk.endswith("_tks")
|
||||
else:
|
||||
if i == 0:
|
||||
logging.debug(
|
||||
f"[TABLE_META_DEBUG] chunk missing ES field {tk!r}"
|
||||
f"{' and ' + raw_k + ' (raw)' if raw_k else ''} for column '{col}'"
|
||||
)
|
||||
continue
|
||||
s = _es_field_value_to_doc_metadata(val, from_tks_fallback=from_tks)
|
||||
if s is not None:
|
||||
acc[col].append(s)
|
||||
|
||||
for col, vals in acc.items():
|
||||
logging.debug(
|
||||
"[TABLE_META_DEBUG] Column '%s' values found (count=%d)",
|
||||
col,
|
||||
len(vals),
|
||||
)
|
||||
|
||||
out = {}
|
||||
for col, vals in acc.items():
|
||||
if vals:
|
||||
out[col] = dedupe_list(vals)
|
||||
logging.debug(
|
||||
f"[TABLE_META_DEBUG] aggregated metadata dict keys={list(out.keys())}, "
|
||||
f"sizes={[len(v) for v in out.values()]}"
|
||||
)
|
||||
return out
|
||||
@@ -18,14 +18,15 @@
|
||||
|
||||
from unittest.mock import Mock
|
||||
from api.utils.validation_utils import (
|
||||
validate_immutable_fields,
|
||||
ParserConfig,
|
||||
UpdateDocumentReq,
|
||||
validate_chunk_method,
|
||||
validate_document_name,
|
||||
validate_chunk_method
|
||||
validate_immutable_fields,
|
||||
)
|
||||
from api.constants import FILE_NAME_LEN_LIMIT
|
||||
from api.db import FileType
|
||||
from common.constants import RetCode
|
||||
from api.utils.validation_utils import UpdateDocumentReq
|
||||
|
||||
|
||||
def test_validate_immutable_fields_no_changes():
|
||||
@@ -299,4 +300,15 @@ def test_validate_chunk_method_other_extensions_still_valid():
|
||||
|
||||
error_msg, error_code = validate_chunk_method(doc)
|
||||
assert error_msg is None
|
||||
assert error_code is None
|
||||
assert error_code is None
|
||||
|
||||
|
||||
def test_parser_config_normalizes_legacy_vectorize_table_column_role():
|
||||
p = ParserConfig(
|
||||
table_column_roles={"title": "vectorize", "country": "metadata", "x": "both"},
|
||||
)
|
||||
assert p.table_column_roles == {
|
||||
"title": "indexing",
|
||||
"country": "metadata",
|
||||
"x": "both",
|
||||
}
|
||||
0
test/unit_test/rag/app/__init__.py
Normal file
0
test/unit_test/rag/app/__init__.py
Normal file
235
test/unit_test/rag/app/test_table_chunk_column_roles.py
Normal file
235
test/unit_test/rag/app/test_table_chunk_column_roles.py
Normal file
@@ -0,0 +1,235 @@
|
||||
#
|
||||
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
#
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License on an "AS IS" BASIS, WITHOUT WARRANTIES
|
||||
# OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
# for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
#
|
||||
|
||||
"""Integration-style tests for rag.app.table.chunk() column roles (mocked KB + tokenizer)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
# Mock heavy modules that trigger ONNX model loading at import time
|
||||
# table.py -> deepdoc.parser.figure_parser -> rag.app.picture -> OCR()
|
||||
for mod in [
|
||||
"deepdoc.vision.ocr",
|
||||
"deepdoc.parser.figure_parser",
|
||||
"rag.app.picture",
|
||||
]:
|
||||
if mod not in sys.modules:
|
||||
sys.modules[mod] = MagicMock()
|
||||
|
||||
import warnings
|
||||
|
||||
# Importing rag.app.table pulls api -> rag.llm -> deepdoc -> xgboost; xgboost may warn on
|
||||
# pkg_resources in a way that breaks its compat shim unless pkg_resources loads first.
|
||||
warnings.filterwarnings("ignore", message=".*pkg_resources is deprecated.*", category=UserWarning)
|
||||
import pkg_resources # noqa: F401 — stabilize xgboost import during collection
|
||||
|
||||
import pytest
|
||||
|
||||
import common.settings as settings
|
||||
from rag.app.table import chunk
|
||||
|
||||
# chunk() removes columns named id, _id, index, idx — use row_id instead of id.
|
||||
TEST_CSV = b"""row_id,title,content,country,category
|
||||
1,Earthquake hits Turkey,A 5.8 magnitude earthquake struck Konya,Turkey,Disaster
|
||||
2,Oil prices surge,Brent crude jumped 4.2 percent,Global,Economy
|
||||
3,AI regulation proposed,EU unveiled a draft regulation,EU,Technology
|
||||
"""
|
||||
|
||||
FILENAME = "test.csv"
|
||||
KB_ID = "test_kb_id"
|
||||
|
||||
|
||||
def _noop_callback(*_a, **_k):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _es_doc_engine(monkeypatch):
|
||||
monkeypatch.setattr(settings, "DOC_ENGINE_INFINITY", False)
|
||||
monkeypatch.setattr(settings, "DOC_ENGINE_OCEANBASE", False)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _stub_rag_tokenizer(monkeypatch):
|
||||
"""Avoid NLTK / infinity tokenizer deps; keep string content inspectable."""
|
||||
|
||||
def fake_tokenize(line):
|
||||
return str(line)
|
||||
|
||||
monkeypatch.setattr("rag.nlp.rag_tokenizer.tokenize", fake_tokenize)
|
||||
monkeypatch.setattr("rag.nlp.rag_tokenizer.fine_grained_tokenize", fake_tokenize)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_update_kb():
|
||||
with patch("rag.app.table.KnowledgebaseService.update_parser_config") as m:
|
||||
yield m
|
||||
|
||||
|
||||
def _run_chunk(parser_config: dict, mock_update_kb: MagicMock):
|
||||
return chunk(
|
||||
FILENAME,
|
||||
binary=TEST_CSV,
|
||||
callback=_noop_callback,
|
||||
kb_id=KB_ID,
|
||||
parser_config=parser_config,
|
||||
lang="Chinese",
|
||||
)
|
||||
|
||||
|
||||
def test_chunk_auto_mode_all_columns_in_text_and_stored(mock_update_kb: MagicMock):
|
||||
parser_config: dict = {}
|
||||
chunks = _run_chunk(parser_config, mock_update_kb)
|
||||
assert len(chunks) == 3
|
||||
first = chunks[0]
|
||||
cww = first["content_with_weight"]
|
||||
assert "Earthquake hits Turkey" in cww
|
||||
assert "Konya" in cww
|
||||
assert "Turkey" in cww
|
||||
assert "Disaster" in cww
|
||||
assert "1" in cww or "row_id" in cww
|
||||
# ES path: stored typed fields for text columns include *_tks and *_raw; row_id is int -> *_long
|
||||
assert "row_id_long" in first
|
||||
assert "title_raw" in first and "country_raw" in first
|
||||
|
||||
|
||||
def test_chunk_manual_mode_indexing_only(mock_update_kb: MagicMock):
|
||||
parser_config = {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {
|
||||
"title": "indexing",
|
||||
"content": "indexing",
|
||||
"row_id": "metadata",
|
||||
"country": "metadata",
|
||||
"category": "metadata",
|
||||
},
|
||||
}
|
||||
chunks = _run_chunk(parser_config, mock_update_kb)
|
||||
first = chunks[0]
|
||||
cww = first["content_with_weight"]
|
||||
assert "- title:" in cww and "Earthquake" in cww
|
||||
assert "- content:" in cww and "Konya" in cww
|
||||
assert "- country:" not in cww
|
||||
assert "- category:" not in cww
|
||||
assert "- row_id:" not in cww
|
||||
# Column title/content not stored as table fields
|
||||
assert "title_raw" not in first
|
||||
assert "content_raw" not in first
|
||||
assert "country_raw" in first and "category_raw" in first
|
||||
assert "row_id_long" in first
|
||||
|
||||
|
||||
def test_chunk_manual_mode_legacy_vectorize_role(mock_update_kb: MagicMock):
|
||||
"""Stored configs may still use role *vectorize*; chunking treats it like *indexing*."""
|
||||
parser_config = {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {
|
||||
"title": "vectorize",
|
||||
"content": "indexing",
|
||||
"row_id": "metadata",
|
||||
"country": "metadata",
|
||||
"category": "metadata",
|
||||
},
|
||||
}
|
||||
chunks = _run_chunk(parser_config, mock_update_kb)
|
||||
first = chunks[0]
|
||||
cww = first["content_with_weight"]
|
||||
assert "- title:" in cww and "Earthquake" in cww
|
||||
assert "- content:" in cww and "Konya" in cww
|
||||
assert "- country:" not in cww
|
||||
|
||||
|
||||
def test_chunk_manual_mode_metadata_only(mock_update_kb: MagicMock):
|
||||
parser_config = {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {
|
||||
"title": "metadata",
|
||||
"content": "metadata",
|
||||
"row_id": "metadata",
|
||||
"country": "metadata",
|
||||
"category": "metadata",
|
||||
},
|
||||
}
|
||||
chunks = _run_chunk(parser_config, mock_update_kb)
|
||||
first = chunks[0]
|
||||
assert (first.get("content_with_weight") or "").strip() == ""
|
||||
assert "country_raw" in first and "title_raw" in first
|
||||
|
||||
|
||||
def test_chunk_manual_mode_both(mock_update_kb: MagicMock):
|
||||
parser_config = {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {c: "both" for c in ["title", "content", "country", "category", "row_id"]},
|
||||
}
|
||||
chunks = _run_chunk(parser_config, mock_update_kb)
|
||||
first = chunks[0]
|
||||
cww = first["content_with_weight"]
|
||||
assert "Earthquake hits Turkey" in cww
|
||||
assert "Turkey" in cww
|
||||
assert "Disaster" in cww
|
||||
assert "row_id_long" in first
|
||||
assert "title_raw" in first and "country_raw" in first
|
||||
|
||||
|
||||
def test_chunk_manual_mode_partial_roles_default_to_both(mock_update_kb: MagicMock):
|
||||
parser_config = {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {
|
||||
"title": "indexing",
|
||||
"country": "metadata",
|
||||
},
|
||||
}
|
||||
chunks = _run_chunk(parser_config, mock_update_kb)
|
||||
first = chunks[0]
|
||||
cww = first["content_with_weight"]
|
||||
assert "- title:" in cww and "Earthquake" in cww
|
||||
assert "- country:" not in cww
|
||||
assert "- row_id:" in cww
|
||||
assert "- content:" in cww
|
||||
assert "- category:" in cww
|
||||
assert "title_raw" not in first
|
||||
assert "country_raw" in first and "country_tks" in first
|
||||
assert "content_raw" in first and "category_raw" in first
|
||||
|
||||
|
||||
def test_chunk_manual_mode_raw_fields_for_es(mock_update_kb: MagicMock):
|
||||
parser_config = {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {c: "both" for c in ["title", "content", "country", "category", "row_id"]},
|
||||
}
|
||||
chunks = _run_chunk(parser_config, mock_update_kb)
|
||||
first = chunks[0]
|
||||
for col in ("title", "content", "country", "category"):
|
||||
assert f"{col}_raw" in first
|
||||
assert f"{col}_tks" in first
|
||||
|
||||
|
||||
def test_chunk_updates_table_column_names(mock_update_kb: MagicMock):
|
||||
_run_chunk({}, mock_update_kb)
|
||||
mock_update_kb.assert_called_once()
|
||||
args, kwargs = mock_update_kb.call_args
|
||||
assert args[0] == KB_ID
|
||||
payload = args[1]
|
||||
names = payload["table_column_names"]
|
||||
assert names == ["row_id", "title", "content", "country", "category"]
|
||||
|
||||
|
||||
def test_chunk_count_matches_row_count(mock_update_kb: MagicMock):
|
||||
chunks = _run_chunk({}, mock_update_kb)
|
||||
assert len(chunks) == 3
|
||||
1
test/unit_test/rag/svr/__init__.py
Normal file
1
test/unit_test/rag/svr/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Unit tests for rag/svr
|
||||
132
test/unit_test/rag/svr/test_table_column_roles_helpers.py
Normal file
132
test/unit_test/rag/svr/test_table_column_roles_helpers.py
Normal file
@@ -0,0 +1,132 @@
|
||||
#
|
||||
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
"""Unit tests for ES table metadata helpers (rag.utils.table_es_metadata)."""
|
||||
|
||||
from rag.utils.table_es_metadata import (
|
||||
_es_field_value_to_doc_metadata,
|
||||
_es_raw_field_key_from_typed,
|
||||
_probe_es_typed_key_for_column,
|
||||
_resolve_es_chunk_field_key,
|
||||
merge_table_parser_config_from_kb,
|
||||
table_parser_strip_doc_metadata_keys,
|
||||
)
|
||||
|
||||
|
||||
class TestProbeEsTypedKeyForColumn:
|
||||
def test_probe_es_typed_key_tks(self):
|
||||
chunk = {"country_tks": "tok", "other": 1}
|
||||
assert _probe_es_typed_key_for_column("country", chunk) == "country_tks"
|
||||
|
||||
def test_probe_es_typed_key_dt(self):
|
||||
chunk = {"published_date_dt": "2024-01-01"}
|
||||
assert _probe_es_typed_key_for_column("published_date", chunk) == "published_date_dt"
|
||||
|
||||
def test_probe_es_typed_key_raw(self):
|
||||
# Only raw field present (no _tks) — probe returns the raw key
|
||||
chunk = {"country_raw": "Brazil"}
|
||||
assert _probe_es_typed_key_for_column("country", chunk) == "country_raw"
|
||||
|
||||
def test_probe_es_typed_key_no_match(self):
|
||||
chunk = {"other_kwd": "x"}
|
||||
assert _probe_es_typed_key_for_column("country", chunk) is None
|
||||
|
||||
def test_probe_es_typed_key_empty_col(self):
|
||||
assert _probe_es_typed_key_for_column("", {"a_tks": "x"}) is None
|
||||
assert _probe_es_typed_key_for_column(None, {"a_tks": "x"}) is None
|
||||
|
||||
|
||||
class TestResolveEsChunkFieldKey:
|
||||
def test_resolve_es_field_empty_fieldmap_uses_probe(self):
|
||||
sample = {"country_tks": ["tok"]}
|
||||
tk, src = _resolve_es_chunk_field_key("country", {}, sample)
|
||||
assert tk == "country_tks"
|
||||
assert src == "probe"
|
||||
|
||||
def test_resolve_es_field_fieldmap_priority(self):
|
||||
fm = {"guojia_tks": "country"}
|
||||
sample = {"guojia_tks": ["x"], "country_tks": ["y"]}
|
||||
tk, src = _resolve_es_chunk_field_key("country", fm, sample)
|
||||
assert tk == "guojia_tks"
|
||||
assert src == "field_map"
|
||||
|
||||
|
||||
class TestEsRawFieldKeyFromTyped:
|
||||
def test_es_raw_field_key_from_tks(self):
|
||||
assert _es_raw_field_key_from_typed("country_tks") == "country_raw"
|
||||
|
||||
def test_es_raw_field_key_from_non_tks(self):
|
||||
assert _es_raw_field_key_from_typed("country_dt") is None
|
||||
|
||||
def test_es_raw_field_key_from_none(self):
|
||||
assert _es_raw_field_key_from_typed(None) is None
|
||||
|
||||
|
||||
class TestEsFieldValueToDocMetadata:
|
||||
def test_es_field_value_string(self):
|
||||
assert _es_field_value_to_doc_metadata("Brazil", from_tks_fallback=False) == "Brazil"
|
||||
|
||||
def test_es_field_value_list_joined(self):
|
||||
assert (
|
||||
_es_field_value_to_doc_metadata(["hello", "world"], from_tks_fallback=True)
|
||||
== "hello world"
|
||||
)
|
||||
|
||||
def test_es_field_value_empty(self):
|
||||
assert _es_field_value_to_doc_metadata(None, from_tks_fallback=True) is None
|
||||
assert _es_field_value_to_doc_metadata("", from_tks_fallback=True) is None
|
||||
assert _es_field_value_to_doc_metadata([], from_tks_fallback=True) is None
|
||||
|
||||
|
||||
class TestMergeTableParserConfigFromKb:
|
||||
def test_merge_table_parser_config_from_kb(self):
|
||||
task = {
|
||||
"parser_id": "table",
|
||||
"parser_config": {"llm_id": "x"},
|
||||
"kb_parser_config": {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {"a": "metadata"},
|
||||
"table_column_names": ["a", "b"],
|
||||
},
|
||||
}
|
||||
merged = merge_table_parser_config_from_kb(task)
|
||||
assert merged["table_column_mode"] == "manual"
|
||||
assert merged["table_column_roles"] == {"a": "metadata"}
|
||||
assert merged["table_column_names"] == ["a", "b"]
|
||||
assert merged["llm_id"] == "x"
|
||||
|
||||
def test_merge_table_parser_config_auto_default(self):
|
||||
task = {
|
||||
"parser_id": "table",
|
||||
"parser_config": {"foo": 1},
|
||||
"kb_parser_config": {"llm_id": "abc"},
|
||||
}
|
||||
merged = merge_table_parser_config_from_kb(task)
|
||||
assert merged == {"foo": 1} # no table_* keys copied from kb without kb_parser_config keys
|
||||
|
||||
|
||||
class TestTableParserStripDocMetadataKeys:
|
||||
def test_uses_table_column_names_when_present(self):
|
||||
eff = {"table_column_names": ["Region", " SKU "]}
|
||||
assert table_parser_strip_doc_metadata_keys(eff) == frozenset({"Region", "SKU"})
|
||||
|
||||
def test_falls_back_to_role_keys_when_no_names(self):
|
||||
eff = {"table_column_roles": {"x": "metadata", "y": "indexing"}}
|
||||
assert table_parser_strip_doc_metadata_keys(eff) == frozenset({"x", "y"})
|
||||
|
||||
def test_empty_names_falls_back_to_roles(self):
|
||||
eff = {"table_column_names": [], "table_column_roles": {"only": "both"}}
|
||||
assert table_parser_strip_doc_metadata_keys(eff) == frozenset({"only"})
|
||||
230
test/unit_test/rag/svr/test_table_metadata_aggregation.py
Normal file
230
test/unit_test/rag/svr/test_table_metadata_aggregation.py
Normal file
@@ -0,0 +1,230 @@
|
||||
#
|
||||
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
"""Unit tests for aggregate_table_manual_doc_metadata."""
|
||||
|
||||
import pytest
|
||||
|
||||
from rag.utils.table_es_metadata import aggregate_table_manual_doc_metadata, merge_table_parser_config_from_kb
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def es_engine(monkeypatch):
|
||||
monkeypatch.setattr("rag.utils.table_es_metadata.settings.DOC_ENGINE_INFINITY", False)
|
||||
monkeypatch.setattr("rag.utils.table_es_metadata.settings.DOC_ENGINE_OCEANBASE", False)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def infinity_engine(monkeypatch):
|
||||
monkeypatch.setattr("rag.utils.table_es_metadata.settings.DOC_ENGINE_INFINITY", True)
|
||||
monkeypatch.setattr("rag.utils.table_es_metadata.settings.DOC_ENGINE_OCEANBASE", False)
|
||||
|
||||
|
||||
def _table_task(**kb_extra):
|
||||
return {
|
||||
"parser_id": "table",
|
||||
"parser_config": {},
|
||||
"kb_parser_config": {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {"country": "metadata", "category": "metadata"},
|
||||
"table_column_names": ["country", "category"],
|
||||
"field_map": {
|
||||
"country_tks": "country",
|
||||
"category_tks": "category",
|
||||
},
|
||||
**kb_extra,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class TestAggregateTableManualDocMetadata:
|
||||
def test_aggregate_manual_mode_happy_path(self, es_engine):
|
||||
task = _table_task()
|
||||
chunks = [
|
||||
{
|
||||
"country_raw": "Brazil",
|
||||
"category_raw": "Economy",
|
||||
"country_tks": "x",
|
||||
"category_tks": "y",
|
||||
},
|
||||
{
|
||||
"country_raw": "Turkey",
|
||||
"category_raw": "Disaster",
|
||||
"country_tks": "x",
|
||||
"category_tks": "y",
|
||||
},
|
||||
{
|
||||
"country_raw": "Brazil",
|
||||
"category_raw": "Economy",
|
||||
"country_tks": "x",
|
||||
"category_tks": "y",
|
||||
},
|
||||
]
|
||||
out = aggregate_table_manual_doc_metadata(chunks, task)
|
||||
assert out["country"] == ["Brazil", "Turkey"]
|
||||
assert out["category"] == ["Economy", "Disaster"]
|
||||
|
||||
def test_aggregate_auto_mode_returns_empty(self, es_engine):
|
||||
task = {
|
||||
"parser_id": "table",
|
||||
"parser_config": {},
|
||||
"kb_parser_config": {
|
||||
"table_column_mode": "auto",
|
||||
"table_column_roles": {"country": "metadata"},
|
||||
},
|
||||
}
|
||||
assert aggregate_table_manual_doc_metadata([{"country_tks": "x"}], task) == {}
|
||||
|
||||
def test_aggregate_no_mode_returns_empty(self, es_engine):
|
||||
task = {
|
||||
"parser_id": "table",
|
||||
"parser_config": {},
|
||||
"kb_parser_config": {
|
||||
"table_column_roles": {"country": "metadata"},
|
||||
},
|
||||
}
|
||||
assert aggregate_table_manual_doc_metadata([{}], task) == {}
|
||||
|
||||
def test_aggregate_no_metadata_columns(self, es_engine):
|
||||
task = {
|
||||
"parser_id": "table",
|
||||
"parser_config": {},
|
||||
"kb_parser_config": {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {"country": "indexing"},
|
||||
"table_column_names": ["country"],
|
||||
},
|
||||
}
|
||||
assert aggregate_table_manual_doc_metadata([{"country_tks": "x"}], task) == {}
|
||||
|
||||
def test_aggregate_prefers_raw_over_tks(self, es_engine):
|
||||
task = _table_task()
|
||||
task["kb_parser_config"]["table_column_roles"] = {"country": "metadata"}
|
||||
task["kb_parser_config"]["table_column_names"] = ["country"]
|
||||
chunks = [{"country_raw": "Brazil", "country_tks": ["brazil"]}]
|
||||
out = aggregate_table_manual_doc_metadata(chunks, task)
|
||||
assert out == {"country": ["Brazil"]}
|
||||
|
||||
def test_aggregate_tks_fallback(self, es_engine):
|
||||
task = _table_task()
|
||||
task["kb_parser_config"]["table_column_roles"] = {"country": "metadata"}
|
||||
task["kb_parser_config"]["table_column_names"] = ["country"]
|
||||
chunks = [{"country_tks": ["brazil"]}]
|
||||
out = aggregate_table_manual_doc_metadata(chunks, task)
|
||||
assert out == {"country": ["brazil"]}
|
||||
|
||||
def test_aggregate_partial_roles_defaults_to_both(self, es_engine):
|
||||
task = {
|
||||
"parser_id": "table",
|
||||
"parser_config": {},
|
||||
"kb_parser_config": {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {"country": "indexing"},
|
||||
"table_column_names": ["country", "city"],
|
||||
"field_map": {"city_tks": "city"},
|
||||
},
|
||||
}
|
||||
chunks = [{"city_raw": "SP", "city_tks": "t", "country_tks": "x"}]
|
||||
out = aggregate_table_manual_doc_metadata(chunks, task)
|
||||
assert out == {"city": ["SP"]}
|
||||
assert "country" not in out
|
||||
|
||||
def test_aggregate_empty_roles_all_columns_both(self, es_engine):
|
||||
task = {
|
||||
"parser_id": "table",
|
||||
"parser_config": {},
|
||||
"kb_parser_config": {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {},
|
||||
"table_column_names": ["country", "city"],
|
||||
"field_map": {"country_tks": "country", "city_tks": "city"},
|
||||
},
|
||||
}
|
||||
chunks = [
|
||||
{"country_raw": "BR", "city_raw": "SP", "country_tks": "x", "city_tks": "y"},
|
||||
]
|
||||
out = aggregate_table_manual_doc_metadata(chunks, task)
|
||||
assert "country" in out and "city" in out
|
||||
|
||||
def test_aggregate_deduplicates_values(self, es_engine):
|
||||
task = _table_task()
|
||||
task["kb_parser_config"]["table_column_roles"] = {"country": "metadata"}
|
||||
task["kb_parser_config"]["table_column_names"] = ["country"]
|
||||
chunks = [
|
||||
{"country_raw": "US", "country_tks": "x"},
|
||||
{"country_raw": "UK", "country_tks": "y"},
|
||||
{"country_raw": "US", "country_tks": "x"},
|
||||
]
|
||||
out = aggregate_table_manual_doc_metadata(chunks, task)
|
||||
assert out["country"] == ["US", "UK"]
|
||||
|
||||
def test_aggregate_kb_reload_field_map(self, es_engine, monkeypatch):
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
class MockKBS:
|
||||
@staticmethod
|
||||
def get_by_id(kid):
|
||||
kb = MagicMock()
|
||||
kb.parser_config = {"field_map": {"country_tks": "country"}}
|
||||
return True, kb
|
||||
|
||||
monkeypatch.setattr(
|
||||
"rag.utils.table_es_metadata._knowledgebase_service_cls",
|
||||
lambda: MockKBS,
|
||||
)
|
||||
|
||||
task = {
|
||||
"parser_id": "table",
|
||||
"parser_config": {},
|
||||
"kb_parser_config": {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {"country": "metadata"},
|
||||
"table_column_names": ["country"],
|
||||
},
|
||||
"kb_id": "kb-1",
|
||||
}
|
||||
chunks = [{"country_raw": "X", "country_tks": "t"}]
|
||||
out = aggregate_table_manual_doc_metadata(chunks, task)
|
||||
assert out == {"country": ["X"]}
|
||||
|
||||
def test_merge_infinity_chunk_data(self, infinity_engine):
|
||||
task = {
|
||||
"parser_id": "table",
|
||||
"parser_config": {},
|
||||
"kb_parser_config": {
|
||||
"table_column_mode": "manual",
|
||||
"table_column_roles": {"country": "both"},
|
||||
"table_column_names": ["country"],
|
||||
},
|
||||
}
|
||||
chunks = [
|
||||
{"chunk_data": {"country": "US"}},
|
||||
{"chunk_data": {"country": "UK"}},
|
||||
]
|
||||
out = aggregate_table_manual_doc_metadata(chunks, task)
|
||||
assert out == {"country": ["US", "UK"]}
|
||||
|
||||
|
||||
class TestMergeTableParserConfigFromKbExtra:
|
||||
"""Merge tests also covered in helpers file; keep one explicit case for aggregation module."""
|
||||
|
||||
def test_merge_preserves_parser_config_when_parser_not_table(self):
|
||||
task = {
|
||||
"parser_id": "naive",
|
||||
"parser_config": {"a": 1},
|
||||
"kb_parser_config": {"table_column_mode": "manual"},
|
||||
}
|
||||
assert merge_table_parser_config_from_kb(task) == {"a": 1}
|
||||
@@ -713,6 +713,21 @@ Example: A 1 KB message with 1024-dim embedding uses ~9 KB. The 5 MB default lim
|
||||
portugueseBr: 'Portuguese (Brazil)',
|
||||
embeddingModelPlaceholder: 'Please select a embedding model.',
|
||||
chunkMethodPlaceholder: 'Please select a chunking method.',
|
||||
tableColumnMode: 'Column mode',
|
||||
tableColumnModeAuto: 'Auto',
|
||||
tableColumnModeManual: 'Manual',
|
||||
tableColumnModeAutoDescription:
|
||||
'All columns are included in chunk text and stored as metadata (RAGFlow default).',
|
||||
tableColumnRoles: 'Column roles',
|
||||
tableColumnRolesTip:
|
||||
'Choose which columns to include in chunk text (indexed for vector and full-text search), in metadata only (filterable), or both. Changes apply to new parses; re-parse existing documents for roles to take effect.',
|
||||
tableColumnRoleIndexing: 'Indexing',
|
||||
tableColumnRoleMetadata: 'Metadata',
|
||||
tableColumnRoleBoth: 'Both',
|
||||
tableColumnRolesEmpty:
|
||||
'Upload and parse a CSV or Excel file to begin configuring column roles.',
|
||||
tableColumnRolesReparseTip:
|
||||
'Re-parse existing documents for the new column roles to take effect.',
|
||||
parserLabel: {
|
||||
naive: 'General',
|
||||
qa: 'Q&A',
|
||||
|
||||
@@ -1,12 +1,155 @@
|
||||
import { FormControl, FormItem, FormLabel } from '@/components/ui/form';
|
||||
import { RadioGroup, RadioGroupItem } from '@/components/ui/radio-group';
|
||||
import {
|
||||
Select,
|
||||
SelectContent,
|
||||
SelectItem,
|
||||
SelectTrigger,
|
||||
SelectValue,
|
||||
} from '@/components/ui/select';
|
||||
import { useTranslate } from '@/hooks/common-hooks';
|
||||
import { useFormContext, useWatch } from 'react-hook-form';
|
||||
import { ConfigurationFormContainer } from '../configuration-form-container';
|
||||
|
||||
const ROLE_OPTIONS = [
|
||||
{ value: 'both', labelKey: 'tableColumnRoleBoth' },
|
||||
{ value: 'indexing', labelKey: 'tableColumnRoleIndexing' },
|
||||
{ value: 'metadata', labelKey: 'tableColumnRoleMetadata' },
|
||||
] as const;
|
||||
|
||||
function selectTableColumnRoleValue(raw: string | undefined): string {
|
||||
if (!raw) return 'both';
|
||||
return raw === 'vectorize' ? 'indexing' : raw;
|
||||
}
|
||||
|
||||
export function TableConfiguration() {
|
||||
const form = useFormContext();
|
||||
const { t } = useTranslate('knowledgeConfiguration');
|
||||
|
||||
const tableColumnMode = useWatch({
|
||||
control: form.control,
|
||||
name: 'parser_config.table_column_mode',
|
||||
defaultValue: 'auto',
|
||||
});
|
||||
const tableColumnNames = useWatch({
|
||||
control: form.control,
|
||||
name: 'parser_config.table_column_names',
|
||||
defaultValue: [],
|
||||
});
|
||||
const tableColumnRoles = useWatch({
|
||||
control: form.control,
|
||||
name: 'parser_config.table_column_roles',
|
||||
defaultValue: {},
|
||||
});
|
||||
|
||||
const mode = tableColumnMode === 'manual' ? 'manual' : 'auto';
|
||||
const columns: string[] = Array.isArray(tableColumnNames)
|
||||
? tableColumnNames
|
||||
: [];
|
||||
|
||||
const handleModeChange = (value: string) => {
|
||||
form.setValue(
|
||||
'parser_config.table_column_mode',
|
||||
value as 'auto' | 'manual',
|
||||
);
|
||||
};
|
||||
|
||||
const handleRoleChange = (columnName: string, role: string) => {
|
||||
const current =
|
||||
(form.getValues('parser_config.table_column_roles') as Record<
|
||||
string,
|
||||
string
|
||||
>) || {};
|
||||
form.setValue('parser_config.table_column_roles', {
|
||||
...current,
|
||||
[columnName]: role,
|
||||
});
|
||||
};
|
||||
|
||||
return (
|
||||
<ConfigurationFormContainer>
|
||||
{/* <ChunkMethodItem></ChunkMethodItem>
|
||||
<EmbeddingModelItem></EmbeddingModelItem>
|
||||
<FormItem className="space-y-2">
|
||||
<FormLabel className="text-sm font-medium">
|
||||
{t('tableColumnMode')}
|
||||
</FormLabel>
|
||||
<FormControl>
|
||||
<RadioGroup
|
||||
value={mode}
|
||||
onValueChange={handleModeChange}
|
||||
className="flex gap-4"
|
||||
>
|
||||
<div className="flex items-center space-x-2">
|
||||
<RadioGroupItem value="auto" id="table-mode-auto" />
|
||||
<label
|
||||
htmlFor="table-mode-auto"
|
||||
className="text-sm font-normal cursor-pointer"
|
||||
>
|
||||
{t('tableColumnModeAuto')}
|
||||
</label>
|
||||
</div>
|
||||
<div className="flex items-center space-x-2">
|
||||
<RadioGroupItem value="manual" id="table-mode-manual" />
|
||||
<label
|
||||
htmlFor="table-mode-manual"
|
||||
className="text-sm font-normal cursor-pointer"
|
||||
>
|
||||
{t('tableColumnModeManual')}
|
||||
</label>
|
||||
</div>
|
||||
</RadioGroup>
|
||||
</FormControl>
|
||||
</FormItem>
|
||||
|
||||
<PageRankFormField></PageRankFormField> */}
|
||||
{mode === 'auto' && (
|
||||
<p className="text-sm text-muted-foreground">
|
||||
{t('tableColumnModeAutoDescription')}
|
||||
</p>
|
||||
)}
|
||||
|
||||
{mode === 'manual' && columns.length === 0 && (
|
||||
<p className="text-sm text-muted-foreground">
|
||||
{t('tableColumnRolesEmpty')}
|
||||
</p>
|
||||
)}
|
||||
|
||||
{mode === 'manual' && columns.length > 0 && (
|
||||
<>
|
||||
<p className="text-sm text-muted-foreground mb-3">
|
||||
{t('tableColumnRolesTip')}
|
||||
</p>
|
||||
<div className="space-y-3">
|
||||
{columns.map((col) => (
|
||||
<FormItem key={col} className="flex flex-row items-center gap-4">
|
||||
<FormLabel className="min-w-[120px] shrink-0 text-sm font-normal">
|
||||
{col}
|
||||
</FormLabel>
|
||||
<FormControl>
|
||||
<Select
|
||||
value={selectTableColumnRoleValue(
|
||||
tableColumnRoles && tableColumnRoles[col],
|
||||
)}
|
||||
onValueChange={(value) => handleRoleChange(col, value)}
|
||||
>
|
||||
<SelectTrigger className="w-[160px]">
|
||||
<SelectValue />
|
||||
</SelectTrigger>
|
||||
<SelectContent>
|
||||
{ROLE_OPTIONS.map((opt) => (
|
||||
<SelectItem key={opt.value} value={opt.value}>
|
||||
{t(opt.labelKey)}
|
||||
</SelectItem>
|
||||
))}
|
||||
</SelectContent>
|
||||
</Select>
|
||||
</FormControl>
|
||||
</FormItem>
|
||||
))}
|
||||
</div>
|
||||
<p className="text-xs text-muted-foreground mt-3">
|
||||
{t('tableColumnRolesReparseTip')}
|
||||
</p>
|
||||
</>
|
||||
)}
|
||||
</ConfigurationFormContainer>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -94,6 +94,18 @@ export const formSchema = z
|
||||
.optional(),
|
||||
enable_metadata: z.boolean().optional(),
|
||||
llm_id: z.string().optional(),
|
||||
// Table parser: "auto" = all columns both, "manual" = use column role selector
|
||||
table_column_mode: z.enum(['auto', 'manual']).optional(),
|
||||
// Table parser: column name -> role (indexing | metadata | both); legacy "vectorize" -> indexing
|
||||
table_column_roles: z
|
||||
.record(
|
||||
z
|
||||
.enum(['indexing', 'metadata', 'both', 'vectorize'])
|
||||
.transform((role) => (role === 'vectorize' ? 'indexing' : role)),
|
||||
)
|
||||
.optional(),
|
||||
// Table parser: column names list (set by backend after first parse)
|
||||
table_column_names: z.array(z.string()).optional(),
|
||||
})
|
||||
.optional(),
|
||||
pagerank: z.number(),
|
||||
|
||||
Reference in New Issue
Block a user