diff --git a/api/utils/validation_utils.py b/api/utils/validation_utils.py index 94e0fa2ab8..063368a299 100644 --- a/api/utils/validation_utils.py +++ b/api/utils/validation_utils.py @@ -377,6 +377,9 @@ class AutoMetadataConfig(Base): built_in_metadata: Annotated[list[AutoMetadataField], Field(default_factory=list)] +TableColumnRole = Literal["indexing", "metadata", "both"] + + class ParserConfig(Base): auto_keywords: Annotated[int, Field(default=0, ge=0, le=32)] auto_questions: Annotated[int, Field(default=0, ge=0, le=10)] @@ -393,6 +396,25 @@ class ParserConfig(Base): task_page_size: Annotated[int | None, Field(default=None, ge=1)] pages: Annotated[list[list[int]] | None, Field(default=None)] ext: Annotated[dict, Field(default={})] + # Table parser: column name -> "indexing" | "metadata" | "both". Absence => all columns "both". + # Table parser: "auto" = all columns both (default), "manual" = use table_column_roles. None → treated as "auto". + table_column_mode: Annotated[Literal["auto", "manual"] | None, Field(default=None)] + # Table parser: column name -> "indexing" | "metadata" | "both". Used only when table_column_mode == "manual". + table_column_roles: Annotated[dict[str, TableColumnRole] | None, Field(default=None)] + # Table parser: list of column names (set by backend after first parse; used by frontend for role selector). + table_column_names: Annotated[list[str] | None, Field(default=None)] + + @field_validator("table_column_roles", mode="before") + @classmethod + def legacy_vectorize_table_column_role(cls, v: Any) -> Any: + """Normalize legacy role value *vectorize* to *indexing* (chunk text + full-text search).""" + if v is None or not isinstance(v, dict): + return v + out: dict[str, Any] = {} + for key, val in v.items(): + k = key if isinstance(key, str) else str(key) + out[k] = "indexing" if val == "vectorize" else val + return out class UpdateDocumentReq(Base): diff --git a/rag/app/table.py b/rag/app/table.py index ea553ca0f9..6ace2f59e1 100644 --- a/rag/app/table.py +++ b/rag/app/table.py @@ -36,6 +36,7 @@ from rag.nlp import rag_tokenizer, tokenize, tokenize_table from deepdoc.parser import ExcelParser from common import settings +logger = logging.getLogger(__name__) class Excel(ExcelParser): def __call__(self, fnm, binary=None, from_page=0, to_page=MAXIMUM_TASK_PAGE_NUMBER, callback=None, **kwargs): @@ -372,6 +373,11 @@ def chunk(filename, binary=None, from_page=0, to_page=MAXIMUM_TASK_PAGE_NUMBER, Every row in table will be treated as a chunk. """ + _pc0 = kwargs.get("parser_config") or {} + logger.debug(f"[TABLE_PARSER_DEBUG] parser_config keys: {list(_pc0.keys())}") + logger.debug(f"[TABLE_PARSER_DEBUG] table_column_mode: {_pc0.get('table_column_mode')}") + logger.debug(f"[TABLE_PARSER_DEBUG] table_column_roles: {_pc0.get('table_column_roles')}") + tbls = [] is_english = lang.lower() == "english" if re.search(r"\.xlsx?$", filename, re.IGNORECASE): @@ -435,6 +441,19 @@ def chunk(filename, binary=None, from_page=0, to_page=MAXIMUM_TASK_PAGE_NUMBER, # Field type suffixes for database columns # Maps data types to their database field suffixes fields_map = {"text": "_tks", "int": "_long", "keyword": "_kwd", "float": "_flt", "datetime": "_dt", "bool": "_kwd"} + parser_config = kwargs.get("parser_config") or {} + if parser_config.get("table_column_mode") == "manual": + column_roles = parser_config.get("table_column_roles") or {} + else: + column_roles = {} + logger.debug( + f"[TABLE_PARSER_DEBUG] effective table_column_mode={parser_config.get('table_column_mode')!r}, " + f"column_roles keys={list(column_roles.keys())}" + ) + + # Pass 1: infer columns per sheet (multi-sheet Excel => multiple DataFrames). Merge field_map and + # table_column_names, then update KB once so the UI role selector sees all columns, not only the last sheet. + sheet_specs = [] for df in dfs: for n in ["id", "_id", "index", "idx"]: if n in df.columns: @@ -457,22 +476,64 @@ def chunk(filename, binary=None, from_page=0, to_page=MAXIMUM_TASK_PAGE_NUMBER, txts.extend([str(c) for c in cln if c]) clmns_map = [(py_clmns[i].lower() + fields_map[clmn_tys[i]], str(clmns[i]).replace("_", " ")) for i in range(len(clmns))] - # For Infinity/OceanBase: Use original column names as keys since they're stored in chunk_data JSON - # For ES/OS: Use full field names with type suffixes (e.g., url_kwd, body_tks) + # field_map: only columns stored in chunk_data (metadata or both) — used for retrieval/SQL + stored_indices = [ + i for i in range(len(clmns)) + if column_roles.get(clmns[i], "both") in ("metadata", "both") + ] if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE: - # For Infinity/OceanBase: key = original column name, value = display name - field_map = {py_clmns[i].lower(): str(clmns[i]).replace("_", " ") for i in range(len(clmns))} + field_map = { + py_clmns[i].lower(): str(clmns[i]).replace("_", " ") + for i in stored_indices + } else: - # For ES/OS: key = typed field name, value = display name - field_map = {k: v for k, v in clmns_map} - logging.debug(f"Field map: {field_map}") - KnowledgebaseService.update_parser_config(kwargs["kb_id"], {"field_map": field_map}) + field_map = { + clmns_map[i][0]: clmns_map[i][1] + for i in stored_indices + } + logging.debug(f"Field map (sheet): {field_map}") + sheet_specs.append( + { + "df": df, + "clmns": clmns, + "clmn_tys": clmn_tys, + "clmns_map": clmns_map, + "py_clmns": py_clmns, + "field_map": field_map, + } + ) - eng = lang.lower() == "english" # is_english(txts) + merged_field_map = {} + merged_table_column_names = [] + seen_col = set() + for spec in sheet_specs: + merged_field_map.update(spec["field_map"]) + for col in spec["clmns"]: + if col not in seen_col: + seen_col.add(col) + merged_table_column_names.append(col) + + logging.debug(f"Field map (merged across sheets): {merged_field_map}") + kb_id = kwargs.get("kb_id") + if kb_id: + KnowledgebaseService.update_parser_config( + kb_id, + {"field_map": merged_field_map, "table_column_names": merged_table_column_names}, + ) + + eng = lang.lower() == "english" # is_english(txts) + for spec in sheet_specs: + df = spec["df"] + clmns = spec["clmns"] + clmn_tys = spec["clmn_tys"] + clmns_map = spec["clmns_map"] + py_clmns = spec["py_clmns"] + _debug_row_idx = 0 for ii, row in df.iterrows(): + _debug_row_idx += 1 d = {"docnm_kwd": filename, "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename))} - row_fields = [] - data_json = {} # For Infinity: Store all columns in a JSON object + text_fields = [] # indexing + both -> content_with_weight + stored = {} # metadata + both -> chunk_data (Infinity) or typed fields (ES) for j in range(len(clmns)): if row[clmns[j]] is None: continue @@ -480,27 +541,49 @@ def chunk(filename, binary=None, from_page=0, to_page=MAXIMUM_TASK_PAGE_NUMBER, continue if not isinstance(row[clmns[j]], pd.Series) and pd.isna(row[clmns[j]]): continue - # For Infinity/OceanBase: Store in chunk_data JSON column - # For Elasticsearch/OpenSearch: Store as individual fields with type suffixes - if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE: - data_json[str(clmns[j])] = row[clmns[j]] - else: - fld = clmns_map[j][0] - d[fld] = row[clmns[j]] if clmn_tys[j] != "text" else rag_tokenizer.tokenize(row[clmns[j]]) - row_fields.append((clmns[j], row[clmns[j]])) - if not row_fields: + col_name = clmns[j] + role = column_roles.get(col_name, "both") + if _debug_row_idx == 1: + logger.debug(f"[TABLE_PARSER_DEBUG] Column '{col_name}' -> role '{role}'") + if role in ("indexing", "vectorize", "both"): + text_fields.append((col_name, row[col_name])) + if role in ("metadata", "both"): + if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE: + stored[str(col_name)] = row[col_name] + else: + fld = clmns_map[j][0] + if clmn_tys[j] != "text": + stored[fld] = row[col_name] + else: + cell = row[col_name] + stored[fld] = rag_tokenizer.tokenize(cell) + raw_s = str(cell).strip() if cell is not None else "" + if raw_s: + stored[f"{py_clmns[j].lower()}_raw"] = raw_s + if not text_fields and not stored: continue - # Add the data JSON field to the document (for Infinity/OceanBase) if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE: - d["chunk_data"] = data_json - # Format as a structured text for better LLM comprehension - # Format each field as "- Field Name: Value" on separate lines - formatted_text = "\n".join([f"- {field}: {value}" for field, value in row_fields]) + if stored: + d["chunk_data"] = stored + else: + d.update(stored) + formatted_text = "\n".join([f"- {field}: {value}" for field, value in text_fields]) if text_fields else "" tokenize(d, formatted_text, eng) + if _debug_row_idx == 1: + logger.debug( + f"[TABLE_PARSER_DEBUG] Chunk content_with_weight length: {len(d.get('content_with_weight', '') or '')}" + ) + _cd = d.get("chunk_data") + logger.debug( + f"[TABLE_PARSER_DEBUG] Chunk chunk_data keys: {list(_cd.keys()) if isinstance(_cd, dict) else 'N/A'}" + ) + if not (settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE): + _extra = [k for k in d if k not in ("docnm_kwd", "title_tks", "content_with_weight", "content_ltks", "content_sm_ltks")] + logger.debug(f"[TABLE_PARSER_DEBUG] Chunk ES extra field keys (sample): {_extra[:20]}") res.append(d) - if tbls: - doc = {"docnm_kwd": filename, "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename))} - res.extend(tokenize_table(tbls, doc, is_english)) + if tbls: + doc = {"docnm_kwd": filename, "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename))} + res.extend(tokenize_table(tbls, doc, is_english)) callback(0.35, "") return res diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 4d56327842..2568aa036b 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -79,9 +79,15 @@ from common.signal_utils import start_tracemalloc_and_snapshot, stop_tracemalloc from common.exceptions import TaskCanceledException from common import settings from common.constants import PAGERANK_FLD, TAG_FLD, SVR_CONSUMER_GROUP_NAME +from rag.utils.table_es_metadata import ( + aggregate_table_manual_doc_metadata, + merge_table_parser_config_from_kb, + table_parser_strip_doc_metadata_keys, +) BATCH_SIZE = 64 + FACTORY = { "general": naive, ParserType.NAIVE.value: naive, @@ -268,6 +274,16 @@ async def build_chunks(task, progress_callback): logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"])) raise + # Table parser column roles / mode are stored on the dataset (KB) parser_config; + # chunk tasks carry document-level parser_config only — merge KB keys so manual roles apply. + parser_config_for_chunk = merge_table_parser_config_from_kb(task) + if task.get("parser_id", "").lower() == "table" and task.get("kb_parser_config"): + logging.debug( + "[TASK_EXECUTOR_DEBUG] table parser: merged KB keys into parser_config for chunk; " + f"mode={parser_config_for_chunk.get('table_column_mode')}, " + f"roles_keys={list((parser_config_for_chunk.get('table_column_roles') or {}).keys())}" + ) + try: async with chunk_limiter: cks = await thread_pool_exec( @@ -279,7 +295,7 @@ async def build_chunks(task, progress_callback): lang=task["language"], callback=progress_callback, kb_id=task["kb_id"], - parser_config=task["parser_config"], + parser_config=parser_config_for_chunk, tenant_id=task["tenant_id"], ) logging.info("Chunking({}) {}/{} done".format(timer() - st, task["location"], task["name"])) @@ -1262,6 +1278,43 @@ async def do_handle_task(task): DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0) + # Table parser (manual): push metadata/both column values to document-level metadata for UI / chat filters + if task.get("parser_id", "").lower() == "table": + eff_pc = merge_table_parser_config_from_kb(task) + logging.debug( + f"[TABLE_META_DEBUG] table post-index: table_column_mode={eff_pc.get('table_column_mode')!r}" + ) + if eff_pc.get("table_column_mode") == "manual": + try: + agg = aggregate_table_manual_doc_metadata(chunks, task) + logging.debug(f"[TABLE_META_DEBUG] aggregated metadata: {agg}") + strip_keys = table_parser_strip_doc_metadata_keys(eff_pc) + existing = DocMetadataService.get_document_metadata(task_doc_id) + existing = existing if isinstance(existing, dict) else {} + preserved = {k: v for k, v in existing.items() if k not in strip_keys} + merged = update_metadata_to(dict(preserved), agg) + logging.debug( + f"[TABLE_META_DEBUG] calling update_document_metadata for doc_id={task_doc_id}, " + f"meta_fields keys={list(merged.keys())}, " + f"table_strip_key_count={len(strip_keys)}, agg_keys={list(agg.keys())}" + ) + try: + DocMetadataService.update_document_metadata(task_doc_id, merged) + logging.debug("[TABLE_META_DEBUG] update_document_metadata succeeded") + except Exception as ue: + logging.error( + "update_document_metadata failed (table parser, doc_id=%s): %s", + task_doc_id, + ue, + exc_info=True, + ) + except Exception as e: + logging.exception( + "Table parser document metadata aggregation failed (doc_id=%s): %s", + task_doc_id, + e, + ) + progress_callback(msg="Indexing done ({:.2f}s).".format(timer() - start_ts)) if toc_thread: diff --git a/rag/utils/table_es_metadata.py b/rag/utils/table_es_metadata.py new file mode 100644 index 0000000000..18edfc4696 --- /dev/null +++ b/rag/utils/table_es_metadata.py @@ -0,0 +1,296 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Table manual-mode ES field resolution and document metadata aggregation (lightweight; used by task_executor).""" + +import logging + +from common import settings +from common.metadata_utils import dedupe_list + + +def _knowledgebase_service_cls(): + """Lazy import for KnowledgebaseService (used by aggregate; mockable in unit tests).""" + from api.db.services.knowledgebase_service import KnowledgebaseService + + return KnowledgebaseService + + +def merge_table_parser_config_from_kb(task: dict) -> dict: + """Merge dataset-level table parser keys into document parser_config (see build_chunks).""" + pc = task.get("parser_config") or {} + if task.get("parser_id", "").lower() != "table" or not task.get("kb_parser_config"): + return pc + out = dict(pc) + kb_pc = task["kb_parser_config"] + for _k in ("table_column_mode", "table_column_roles", "table_column_names"): + if _k in kb_pc: + out[_k] = kb_pc[_k] + return out + + +def table_parser_strip_doc_metadata_keys(eff_parser_config: dict) -> frozenset[str]: + """ + Table manual mode stores per-column values under document metadata keys equal to the + CSV column name. On reparse, strip these keys from existing metadata before merging + a fresh aggregate so columns switched to indexing-only (or removed) do not persist. + """ + names = eff_parser_config.get("table_column_names") + if names: + return frozenset(str(n).strip() for n in names if n is not None and str(n).strip()) + roles = eff_parser_config.get("table_column_roles") or {} + return frozenset(str(k).strip() for k in roles if k is not None and str(k).strip()) + + +def _field_map_typed_key_for_column(field_map: dict, col: str) -> str | None: + """Map CSV column name to ES typed field key (field_map: typed_key -> display name).""" + if not field_map or not col: + return None + col_s = str(col).strip() + col_norm = col_s.replace("_", " ").strip().lower() + for tk, disp in field_map.items(): + disp_s = str(disp).strip() + if disp_s.lower() == col_norm or disp_s.lower() == col_s.lower(): + return tk + return None + + +def _probe_es_typed_key_for_column(col: str, sample_chunk: dict) -> str | None: + """ + When field_map is missing/stale, try to infer the ES field key present on a chunk. + Table chunks use normalized/pinyin keys of the form , where suffix is + one of: _raw, _tks, _dt, _long, _flt, _kwd (see rag/app/table.py). + """ + if not col or not isinstance(sample_chunk, dict): + return None + base_raw = str(col).strip() + if not base_raw: + return None + base_norm = base_raw.replace("_", " ").strip().lower().replace(" ", "") + suffixes = ("_tks", "_raw", "_dt", "_long", "_flt", "_kwd") + for key in sample_chunk.keys(): + key_s = str(key) + if not key_s: + continue + key_norm = key_s.strip().lower() + if key_norm == base_raw.lower() or key_norm.replace("_", "").replace(" ", "") == base_norm: + return key_s + for key in sample_chunk.keys(): + key_s = str(key) + if not key_s: + continue + key_lower = key_s.lower() + for sfx in suffixes: + if key_lower.endswith(sfx): + core = key_lower[: -len(sfx)] + core_norm = core.replace("_", "").replace(" ", "") + if core_norm == base_norm: + return key_s + return None + + +def _resolve_es_chunk_field_key( + col: str, field_map: dict, sample_chunk: dict | None +) -> tuple[str | None, str]: + """Prefer field_map when key exists on chunk; else probe by suffix (matches table.py naming).""" + tk_fm = _field_map_typed_key_for_column(field_map, col) if field_map else None + if sample_chunk: + if tk_fm and tk_fm in sample_chunk: + return tk_fm, "field_map" + probed = _probe_es_typed_key_for_column(col, sample_chunk) + if probed: + return probed, "probe" if not tk_fm else "probe_field_map_mismatch" + if tk_fm: + return tk_fm, "field_map_absent_on_chunk" + if tk_fm: + return tk_fm, "field_map" + return None, "none" + + +def _value_to_meta_string(val) -> str | None: + """Normalize chunk field values for DocMetadataService (strings / list of strings only).""" + if val is None: + return None + if isinstance(val, bool): + return str(val).lower() + if isinstance(val, (int, float)): + return str(val) + if isinstance(val, str): + s = val.strip() + return s if s else None + return str(val) + + +def _es_raw_field_key_from_typed(tk: str | None) -> str | None: + """ES text columns use *_tks (tokenized); raw display value is stored as {same_base}_raw (see rag/app/table.py).""" + if not tk or not tk.endswith("_tks"): + return None + return tk[: -len("_tks")] + "_raw" + + +def _es_field_value_to_doc_metadata(val, *, from_tks_fallback: bool) -> str | None: + """Prefer raw strings; for legacy *_tks tokenized fields, normalize list/str to a single display string.""" + if val is None: + return None + if from_tks_fallback and isinstance(val, list): + parts = [str(x).strip() for x in val if x is not None and str(x).strip()] + if not parts: + return None + return " ".join(parts) + return _value_to_meta_string(val) + + +def aggregate_table_manual_doc_metadata(chunks: list, task: dict) -> dict: + """ + Collect unique values per metadata/both column across chunks for document-level metadata. + Used when table_column_mode == manual (parallel to LLM gen_metadata, no schema required). + """ + logging.debug( + f"[TABLE_META_DEBUG] aggregate_table_manual_doc_metadata called with {len(chunks)} chunks" + ) + eff = merge_table_parser_config_from_kb(task) + if eff.get("table_column_mode") != "manual": + logging.debug( + f"[TABLE_META_DEBUG] skip aggregate: table_column_mode={eff.get('table_column_mode')!r}" + ) + return {} + roles = eff.get("table_column_roles") or {} + table_column_names = eff.get("table_column_names") or [] + if table_column_names: + meta_cols = [ + col + for col in table_column_names + if roles.get(col, "both") in ("metadata", "both") + ] + else: + meta_cols = [c for c, r in roles.items() if r in ("metadata", "both")] + if not meta_cols: + logging.debug( + "[TABLE_META_DEBUG] skip aggregate: no metadata/both columns " + f"(table_column_names_present={bool(table_column_names)})" + ) + return {} + fm = (task.get("kb_parser_config") or {}).get("field_map") or {} + kb_id = task.get("kb_id") + if not fm and kb_id: + try: + KBS = _knowledgebase_service_cls() + ok, kb = KBS.get_by_id(kb_id) + if ok and kb: + fresh_pc = kb.parser_config or {} + reloaded = fresh_pc.get("field_map") or {} + if reloaded: + fm = reloaded + logging.debug( + f"[TABLE_META_DEBUG] reloaded field_map from DB: {len(fm)} entries" + ) + else: + logging.debug( + "[TABLE_META_DEBUG] KB reload: parser_config has no field_map yet; " + "will use ES key probe on chunk dicts if applicable" + ) + except Exception as e: + logging.debug( + "[TABLE_META_DEBUG] failed to reload field_map from DB: %s", + e, + exc_info=True, + ) + if not fm and not (settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE): + logging.debug( + "[TABLE_META_DEBUG] field_map empty on task snapshot — will use ES key probe on chunk dicts; " + f"kb_parser_config keys={list((task.get('kb_parser_config') or {}).keys())}" + ) + logging.debug( + f"[TABLE_META_DEBUG] meta_cols={meta_cols}, field_map entries={len(fm)}, " + f"infinity={settings.DOC_ENGINE_INFINITY}, oceanbase={settings.DOC_ENGINE_OCEANBASE}" + ) + sample_ck = next((c for c in chunks if isinstance(c, dict)), None) + if sample_ck: + sk = [ + k + for k in sample_ck.keys() + if not (str(k).startswith("q_") and str(k).endswith("_vec")) + ][:50] + logging.debug(f"[TABLE_META_DEBUG] first chunk non-vector keys (sample): {sk}") + + es_col_keys: dict[str, tuple[str | None, str]] = {} + if not (settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE): + for col in meta_cols: + tk, src = _resolve_es_chunk_field_key(col, fm, sample_ck) + es_col_keys[col] = (tk, src) + logging.debug( + f"[TABLE_META_DEBUG] column '{col}' -> ES key {tk!r} (source={src})" + ) + + acc: dict[str, list] = {c: [] for c in meta_cols} + + for i, ck in enumerate(chunks): + if not isinstance(ck, dict): + continue + if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE: + cd = ck.get("chunk_data") + if not isinstance(cd, dict): + continue + for col in meta_cols: + if col not in cd: + continue + s = _value_to_meta_string(cd[col]) + if s is not None: + acc[col].append(s) + else: + for col in meta_cols: + tk, _src = es_col_keys.get(col, (None, "none")) + if not tk: + if i == 0: + logging.debug( + f"[TABLE_META_DEBUG] no resolved ES key for column '{col}'" + ) + continue + raw_k = _es_raw_field_key_from_typed(tk) + val = None + from_tks = False + if raw_k and raw_k in ck: + val = ck[raw_k] + elif tk in ck: + val = ck[tk] + from_tks = tk.endswith("_tks") + else: + if i == 0: + logging.debug( + f"[TABLE_META_DEBUG] chunk missing ES field {tk!r}" + f"{' and ' + raw_k + ' (raw)' if raw_k else ''} for column '{col}'" + ) + continue + s = _es_field_value_to_doc_metadata(val, from_tks_fallback=from_tks) + if s is not None: + acc[col].append(s) + + for col, vals in acc.items(): + logging.debug( + "[TABLE_META_DEBUG] Column '%s' values found (count=%d)", + col, + len(vals), + ) + + out = {} + for col, vals in acc.items(): + if vals: + out[col] = dedupe_list(vals) + logging.debug( + f"[TABLE_META_DEBUG] aggregated metadata dict keys={list(out.keys())}, " + f"sizes={[len(v) for v in out.values()]}" + ) + return out diff --git a/test/unit_test/api/utils/test_doc_validation.py b/test/unit_test/api/utils/test_doc_validation.py index 25e115c429..b068e2b499 100644 --- a/test/unit_test/api/utils/test_doc_validation.py +++ b/test/unit_test/api/utils/test_doc_validation.py @@ -18,14 +18,15 @@ from unittest.mock import Mock from api.utils.validation_utils import ( - validate_immutable_fields, + ParserConfig, + UpdateDocumentReq, + validate_chunk_method, validate_document_name, - validate_chunk_method + validate_immutable_fields, ) from api.constants import FILE_NAME_LEN_LIMIT from api.db import FileType from common.constants import RetCode -from api.utils.validation_utils import UpdateDocumentReq def test_validate_immutable_fields_no_changes(): @@ -299,4 +300,15 @@ def test_validate_chunk_method_other_extensions_still_valid(): error_msg, error_code = validate_chunk_method(doc) assert error_msg is None - assert error_code is None \ No newline at end of file + assert error_code is None + + +def test_parser_config_normalizes_legacy_vectorize_table_column_role(): + p = ParserConfig( + table_column_roles={"title": "vectorize", "country": "metadata", "x": "both"}, + ) + assert p.table_column_roles == { + "title": "indexing", + "country": "metadata", + "x": "both", + } \ No newline at end of file diff --git a/test/unit_test/rag/app/__init__.py b/test/unit_test/rag/app/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/unit_test/rag/app/test_table_chunk_column_roles.py b/test/unit_test/rag/app/test_table_chunk_column_roles.py new file mode 100644 index 0000000000..40eed2ae5b --- /dev/null +++ b/test/unit_test/rag/app/test_table_chunk_column_roles.py @@ -0,0 +1,235 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. See the License +# for the specific language governing permissions and limitations under +# the License. +# + +"""Integration-style tests for rag.app.table.chunk() column roles (mocked KB + tokenizer).""" + +from __future__ import annotations + +import sys +from unittest.mock import MagicMock, patch + +# Mock heavy modules that trigger ONNX model loading at import time +# table.py -> deepdoc.parser.figure_parser -> rag.app.picture -> OCR() +for mod in [ + "deepdoc.vision.ocr", + "deepdoc.parser.figure_parser", + "rag.app.picture", +]: + if mod not in sys.modules: + sys.modules[mod] = MagicMock() + +import warnings + +# Importing rag.app.table pulls api -> rag.llm -> deepdoc -> xgboost; xgboost may warn on +# pkg_resources in a way that breaks its compat shim unless pkg_resources loads first. +warnings.filterwarnings("ignore", message=".*pkg_resources is deprecated.*", category=UserWarning) +import pkg_resources # noqa: F401 — stabilize xgboost import during collection + +import pytest + +import common.settings as settings +from rag.app.table import chunk + +# chunk() removes columns named id, _id, index, idx — use row_id instead of id. +TEST_CSV = b"""row_id,title,content,country,category +1,Earthquake hits Turkey,A 5.8 magnitude earthquake struck Konya,Turkey,Disaster +2,Oil prices surge,Brent crude jumped 4.2 percent,Global,Economy +3,AI regulation proposed,EU unveiled a draft regulation,EU,Technology +""" + +FILENAME = "test.csv" +KB_ID = "test_kb_id" + + +def _noop_callback(*_a, **_k): + pass + + +@pytest.fixture(autouse=True) +def _es_doc_engine(monkeypatch): + monkeypatch.setattr(settings, "DOC_ENGINE_INFINITY", False) + monkeypatch.setattr(settings, "DOC_ENGINE_OCEANBASE", False) + + +@pytest.fixture(autouse=True) +def _stub_rag_tokenizer(monkeypatch): + """Avoid NLTK / infinity tokenizer deps; keep string content inspectable.""" + + def fake_tokenize(line): + return str(line) + + monkeypatch.setattr("rag.nlp.rag_tokenizer.tokenize", fake_tokenize) + monkeypatch.setattr("rag.nlp.rag_tokenizer.fine_grained_tokenize", fake_tokenize) + + +@pytest.fixture +def mock_update_kb(): + with patch("rag.app.table.KnowledgebaseService.update_parser_config") as m: + yield m + + +def _run_chunk(parser_config: dict, mock_update_kb: MagicMock): + return chunk( + FILENAME, + binary=TEST_CSV, + callback=_noop_callback, + kb_id=KB_ID, + parser_config=parser_config, + lang="Chinese", + ) + + +def test_chunk_auto_mode_all_columns_in_text_and_stored(mock_update_kb: MagicMock): + parser_config: dict = {} + chunks = _run_chunk(parser_config, mock_update_kb) + assert len(chunks) == 3 + first = chunks[0] + cww = first["content_with_weight"] + assert "Earthquake hits Turkey" in cww + assert "Konya" in cww + assert "Turkey" in cww + assert "Disaster" in cww + assert "1" in cww or "row_id" in cww + # ES path: stored typed fields for text columns include *_tks and *_raw; row_id is int -> *_long + assert "row_id_long" in first + assert "title_raw" in first and "country_raw" in first + + +def test_chunk_manual_mode_indexing_only(mock_update_kb: MagicMock): + parser_config = { + "table_column_mode": "manual", + "table_column_roles": { + "title": "indexing", + "content": "indexing", + "row_id": "metadata", + "country": "metadata", + "category": "metadata", + }, + } + chunks = _run_chunk(parser_config, mock_update_kb) + first = chunks[0] + cww = first["content_with_weight"] + assert "- title:" in cww and "Earthquake" in cww + assert "- content:" in cww and "Konya" in cww + assert "- country:" not in cww + assert "- category:" not in cww + assert "- row_id:" not in cww + # Column title/content not stored as table fields + assert "title_raw" not in first + assert "content_raw" not in first + assert "country_raw" in first and "category_raw" in first + assert "row_id_long" in first + + +def test_chunk_manual_mode_legacy_vectorize_role(mock_update_kb: MagicMock): + """Stored configs may still use role *vectorize*; chunking treats it like *indexing*.""" + parser_config = { + "table_column_mode": "manual", + "table_column_roles": { + "title": "vectorize", + "content": "indexing", + "row_id": "metadata", + "country": "metadata", + "category": "metadata", + }, + } + chunks = _run_chunk(parser_config, mock_update_kb) + first = chunks[0] + cww = first["content_with_weight"] + assert "- title:" in cww and "Earthquake" in cww + assert "- content:" in cww and "Konya" in cww + assert "- country:" not in cww + + +def test_chunk_manual_mode_metadata_only(mock_update_kb: MagicMock): + parser_config = { + "table_column_mode": "manual", + "table_column_roles": { + "title": "metadata", + "content": "metadata", + "row_id": "metadata", + "country": "metadata", + "category": "metadata", + }, + } + chunks = _run_chunk(parser_config, mock_update_kb) + first = chunks[0] + assert (first.get("content_with_weight") or "").strip() == "" + assert "country_raw" in first and "title_raw" in first + + +def test_chunk_manual_mode_both(mock_update_kb: MagicMock): + parser_config = { + "table_column_mode": "manual", + "table_column_roles": {c: "both" for c in ["title", "content", "country", "category", "row_id"]}, + } + chunks = _run_chunk(parser_config, mock_update_kb) + first = chunks[0] + cww = first["content_with_weight"] + assert "Earthquake hits Turkey" in cww + assert "Turkey" in cww + assert "Disaster" in cww + assert "row_id_long" in first + assert "title_raw" in first and "country_raw" in first + + +def test_chunk_manual_mode_partial_roles_default_to_both(mock_update_kb: MagicMock): + parser_config = { + "table_column_mode": "manual", + "table_column_roles": { + "title": "indexing", + "country": "metadata", + }, + } + chunks = _run_chunk(parser_config, mock_update_kb) + first = chunks[0] + cww = first["content_with_weight"] + assert "- title:" in cww and "Earthquake" in cww + assert "- country:" not in cww + assert "- row_id:" in cww + assert "- content:" in cww + assert "- category:" in cww + assert "title_raw" not in first + assert "country_raw" in first and "country_tks" in first + assert "content_raw" in first and "category_raw" in first + + +def test_chunk_manual_mode_raw_fields_for_es(mock_update_kb: MagicMock): + parser_config = { + "table_column_mode": "manual", + "table_column_roles": {c: "both" for c in ["title", "content", "country", "category", "row_id"]}, + } + chunks = _run_chunk(parser_config, mock_update_kb) + first = chunks[0] + for col in ("title", "content", "country", "category"): + assert f"{col}_raw" in first + assert f"{col}_tks" in first + + +def test_chunk_updates_table_column_names(mock_update_kb: MagicMock): + _run_chunk({}, mock_update_kb) + mock_update_kb.assert_called_once() + args, kwargs = mock_update_kb.call_args + assert args[0] == KB_ID + payload = args[1] + names = payload["table_column_names"] + assert names == ["row_id", "title", "content", "country", "category"] + + +def test_chunk_count_matches_row_count(mock_update_kb: MagicMock): + chunks = _run_chunk({}, mock_update_kb) + assert len(chunks) == 3 diff --git a/test/unit_test/rag/svr/__init__.py b/test/unit_test/rag/svr/__init__.py new file mode 100644 index 0000000000..895bd9cee4 --- /dev/null +++ b/test/unit_test/rag/svr/__init__.py @@ -0,0 +1 @@ +# Unit tests for rag/svr diff --git a/test/unit_test/rag/svr/test_table_column_roles_helpers.py b/test/unit_test/rag/svr/test_table_column_roles_helpers.py new file mode 100644 index 0000000000..fe4eed27fe --- /dev/null +++ b/test/unit_test/rag/svr/test_table_column_roles_helpers.py @@ -0,0 +1,132 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for ES table metadata helpers (rag.utils.table_es_metadata).""" + +from rag.utils.table_es_metadata import ( + _es_field_value_to_doc_metadata, + _es_raw_field_key_from_typed, + _probe_es_typed_key_for_column, + _resolve_es_chunk_field_key, + merge_table_parser_config_from_kb, + table_parser_strip_doc_metadata_keys, +) + + +class TestProbeEsTypedKeyForColumn: + def test_probe_es_typed_key_tks(self): + chunk = {"country_tks": "tok", "other": 1} + assert _probe_es_typed_key_for_column("country", chunk) == "country_tks" + + def test_probe_es_typed_key_dt(self): + chunk = {"published_date_dt": "2024-01-01"} + assert _probe_es_typed_key_for_column("published_date", chunk) == "published_date_dt" + + def test_probe_es_typed_key_raw(self): + # Only raw field present (no _tks) — probe returns the raw key + chunk = {"country_raw": "Brazil"} + assert _probe_es_typed_key_for_column("country", chunk) == "country_raw" + + def test_probe_es_typed_key_no_match(self): + chunk = {"other_kwd": "x"} + assert _probe_es_typed_key_for_column("country", chunk) is None + + def test_probe_es_typed_key_empty_col(self): + assert _probe_es_typed_key_for_column("", {"a_tks": "x"}) is None + assert _probe_es_typed_key_for_column(None, {"a_tks": "x"}) is None + + +class TestResolveEsChunkFieldKey: + def test_resolve_es_field_empty_fieldmap_uses_probe(self): + sample = {"country_tks": ["tok"]} + tk, src = _resolve_es_chunk_field_key("country", {}, sample) + assert tk == "country_tks" + assert src == "probe" + + def test_resolve_es_field_fieldmap_priority(self): + fm = {"guojia_tks": "country"} + sample = {"guojia_tks": ["x"], "country_tks": ["y"]} + tk, src = _resolve_es_chunk_field_key("country", fm, sample) + assert tk == "guojia_tks" + assert src == "field_map" + + +class TestEsRawFieldKeyFromTyped: + def test_es_raw_field_key_from_tks(self): + assert _es_raw_field_key_from_typed("country_tks") == "country_raw" + + def test_es_raw_field_key_from_non_tks(self): + assert _es_raw_field_key_from_typed("country_dt") is None + + def test_es_raw_field_key_from_none(self): + assert _es_raw_field_key_from_typed(None) is None + + +class TestEsFieldValueToDocMetadata: + def test_es_field_value_string(self): + assert _es_field_value_to_doc_metadata("Brazil", from_tks_fallback=False) == "Brazil" + + def test_es_field_value_list_joined(self): + assert ( + _es_field_value_to_doc_metadata(["hello", "world"], from_tks_fallback=True) + == "hello world" + ) + + def test_es_field_value_empty(self): + assert _es_field_value_to_doc_metadata(None, from_tks_fallback=True) is None + assert _es_field_value_to_doc_metadata("", from_tks_fallback=True) is None + assert _es_field_value_to_doc_metadata([], from_tks_fallback=True) is None + + +class TestMergeTableParserConfigFromKb: + def test_merge_table_parser_config_from_kb(self): + task = { + "parser_id": "table", + "parser_config": {"llm_id": "x"}, + "kb_parser_config": { + "table_column_mode": "manual", + "table_column_roles": {"a": "metadata"}, + "table_column_names": ["a", "b"], + }, + } + merged = merge_table_parser_config_from_kb(task) + assert merged["table_column_mode"] == "manual" + assert merged["table_column_roles"] == {"a": "metadata"} + assert merged["table_column_names"] == ["a", "b"] + assert merged["llm_id"] == "x" + + def test_merge_table_parser_config_auto_default(self): + task = { + "parser_id": "table", + "parser_config": {"foo": 1}, + "kb_parser_config": {"llm_id": "abc"}, + } + merged = merge_table_parser_config_from_kb(task) + assert merged == {"foo": 1} # no table_* keys copied from kb without kb_parser_config keys + + +class TestTableParserStripDocMetadataKeys: + def test_uses_table_column_names_when_present(self): + eff = {"table_column_names": ["Region", " SKU "]} + assert table_parser_strip_doc_metadata_keys(eff) == frozenset({"Region", "SKU"}) + + def test_falls_back_to_role_keys_when_no_names(self): + eff = {"table_column_roles": {"x": "metadata", "y": "indexing"}} + assert table_parser_strip_doc_metadata_keys(eff) == frozenset({"x", "y"}) + + def test_empty_names_falls_back_to_roles(self): + eff = {"table_column_names": [], "table_column_roles": {"only": "both"}} + assert table_parser_strip_doc_metadata_keys(eff) == frozenset({"only"}) diff --git a/test/unit_test/rag/svr/test_table_metadata_aggregation.py b/test/unit_test/rag/svr/test_table_metadata_aggregation.py new file mode 100644 index 0000000000..59d2f7ee47 --- /dev/null +++ b/test/unit_test/rag/svr/test_table_metadata_aggregation.py @@ -0,0 +1,230 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for aggregate_table_manual_doc_metadata.""" + +import pytest + +from rag.utils.table_es_metadata import aggregate_table_manual_doc_metadata, merge_table_parser_config_from_kb + + +@pytest.fixture +def es_engine(monkeypatch): + monkeypatch.setattr("rag.utils.table_es_metadata.settings.DOC_ENGINE_INFINITY", False) + monkeypatch.setattr("rag.utils.table_es_metadata.settings.DOC_ENGINE_OCEANBASE", False) + + +@pytest.fixture +def infinity_engine(monkeypatch): + monkeypatch.setattr("rag.utils.table_es_metadata.settings.DOC_ENGINE_INFINITY", True) + monkeypatch.setattr("rag.utils.table_es_metadata.settings.DOC_ENGINE_OCEANBASE", False) + + +def _table_task(**kb_extra): + return { + "parser_id": "table", + "parser_config": {}, + "kb_parser_config": { + "table_column_mode": "manual", + "table_column_roles": {"country": "metadata", "category": "metadata"}, + "table_column_names": ["country", "category"], + "field_map": { + "country_tks": "country", + "category_tks": "category", + }, + **kb_extra, + }, + } + + +class TestAggregateTableManualDocMetadata: + def test_aggregate_manual_mode_happy_path(self, es_engine): + task = _table_task() + chunks = [ + { + "country_raw": "Brazil", + "category_raw": "Economy", + "country_tks": "x", + "category_tks": "y", + }, + { + "country_raw": "Turkey", + "category_raw": "Disaster", + "country_tks": "x", + "category_tks": "y", + }, + { + "country_raw": "Brazil", + "category_raw": "Economy", + "country_tks": "x", + "category_tks": "y", + }, + ] + out = aggregate_table_manual_doc_metadata(chunks, task) + assert out["country"] == ["Brazil", "Turkey"] + assert out["category"] == ["Economy", "Disaster"] + + def test_aggregate_auto_mode_returns_empty(self, es_engine): + task = { + "parser_id": "table", + "parser_config": {}, + "kb_parser_config": { + "table_column_mode": "auto", + "table_column_roles": {"country": "metadata"}, + }, + } + assert aggregate_table_manual_doc_metadata([{"country_tks": "x"}], task) == {} + + def test_aggregate_no_mode_returns_empty(self, es_engine): + task = { + "parser_id": "table", + "parser_config": {}, + "kb_parser_config": { + "table_column_roles": {"country": "metadata"}, + }, + } + assert aggregate_table_manual_doc_metadata([{}], task) == {} + + def test_aggregate_no_metadata_columns(self, es_engine): + task = { + "parser_id": "table", + "parser_config": {}, + "kb_parser_config": { + "table_column_mode": "manual", + "table_column_roles": {"country": "indexing"}, + "table_column_names": ["country"], + }, + } + assert aggregate_table_manual_doc_metadata([{"country_tks": "x"}], task) == {} + + def test_aggregate_prefers_raw_over_tks(self, es_engine): + task = _table_task() + task["kb_parser_config"]["table_column_roles"] = {"country": "metadata"} + task["kb_parser_config"]["table_column_names"] = ["country"] + chunks = [{"country_raw": "Brazil", "country_tks": ["brazil"]}] + out = aggregate_table_manual_doc_metadata(chunks, task) + assert out == {"country": ["Brazil"]} + + def test_aggregate_tks_fallback(self, es_engine): + task = _table_task() + task["kb_parser_config"]["table_column_roles"] = {"country": "metadata"} + task["kb_parser_config"]["table_column_names"] = ["country"] + chunks = [{"country_tks": ["brazil"]}] + out = aggregate_table_manual_doc_metadata(chunks, task) + assert out == {"country": ["brazil"]} + + def test_aggregate_partial_roles_defaults_to_both(self, es_engine): + task = { + "parser_id": "table", + "parser_config": {}, + "kb_parser_config": { + "table_column_mode": "manual", + "table_column_roles": {"country": "indexing"}, + "table_column_names": ["country", "city"], + "field_map": {"city_tks": "city"}, + }, + } + chunks = [{"city_raw": "SP", "city_tks": "t", "country_tks": "x"}] + out = aggregate_table_manual_doc_metadata(chunks, task) + assert out == {"city": ["SP"]} + assert "country" not in out + + def test_aggregate_empty_roles_all_columns_both(self, es_engine): + task = { + "parser_id": "table", + "parser_config": {}, + "kb_parser_config": { + "table_column_mode": "manual", + "table_column_roles": {}, + "table_column_names": ["country", "city"], + "field_map": {"country_tks": "country", "city_tks": "city"}, + }, + } + chunks = [ + {"country_raw": "BR", "city_raw": "SP", "country_tks": "x", "city_tks": "y"}, + ] + out = aggregate_table_manual_doc_metadata(chunks, task) + assert "country" in out and "city" in out + + def test_aggregate_deduplicates_values(self, es_engine): + task = _table_task() + task["kb_parser_config"]["table_column_roles"] = {"country": "metadata"} + task["kb_parser_config"]["table_column_names"] = ["country"] + chunks = [ + {"country_raw": "US", "country_tks": "x"}, + {"country_raw": "UK", "country_tks": "y"}, + {"country_raw": "US", "country_tks": "x"}, + ] + out = aggregate_table_manual_doc_metadata(chunks, task) + assert out["country"] == ["US", "UK"] + + def test_aggregate_kb_reload_field_map(self, es_engine, monkeypatch): + from unittest.mock import MagicMock + + class MockKBS: + @staticmethod + def get_by_id(kid): + kb = MagicMock() + kb.parser_config = {"field_map": {"country_tks": "country"}} + return True, kb + + monkeypatch.setattr( + "rag.utils.table_es_metadata._knowledgebase_service_cls", + lambda: MockKBS, + ) + + task = { + "parser_id": "table", + "parser_config": {}, + "kb_parser_config": { + "table_column_mode": "manual", + "table_column_roles": {"country": "metadata"}, + "table_column_names": ["country"], + }, + "kb_id": "kb-1", + } + chunks = [{"country_raw": "X", "country_tks": "t"}] + out = aggregate_table_manual_doc_metadata(chunks, task) + assert out == {"country": ["X"]} + + def test_merge_infinity_chunk_data(self, infinity_engine): + task = { + "parser_id": "table", + "parser_config": {}, + "kb_parser_config": { + "table_column_mode": "manual", + "table_column_roles": {"country": "both"}, + "table_column_names": ["country"], + }, + } + chunks = [ + {"chunk_data": {"country": "US"}}, + {"chunk_data": {"country": "UK"}}, + ] + out = aggregate_table_manual_doc_metadata(chunks, task) + assert out == {"country": ["US", "UK"]} + + +class TestMergeTableParserConfigFromKbExtra: + """Merge tests also covered in helpers file; keep one explicit case for aggregation module.""" + + def test_merge_preserves_parser_config_when_parser_not_table(self): + task = { + "parser_id": "naive", + "parser_config": {"a": 1}, + "kb_parser_config": {"table_column_mode": "manual"}, + } + assert merge_table_parser_config_from_kb(task) == {"a": 1} diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index 9078dc749e..a13ff2263b 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -713,6 +713,21 @@ Example: A 1 KB message with 1024-dim embedding uses ~9 KB. The 5 MB default lim portugueseBr: 'Portuguese (Brazil)', embeddingModelPlaceholder: 'Please select a embedding model.', chunkMethodPlaceholder: 'Please select a chunking method.', + tableColumnMode: 'Column mode', + tableColumnModeAuto: 'Auto', + tableColumnModeManual: 'Manual', + tableColumnModeAutoDescription: + 'All columns are included in chunk text and stored as metadata (RAGFlow default).', + tableColumnRoles: 'Column roles', + tableColumnRolesTip: + 'Choose which columns to include in chunk text (indexed for vector and full-text search), in metadata only (filterable), or both. Changes apply to new parses; re-parse existing documents for roles to take effect.', + tableColumnRoleIndexing: 'Indexing', + tableColumnRoleMetadata: 'Metadata', + tableColumnRoleBoth: 'Both', + tableColumnRolesEmpty: + 'Upload and parse a CSV or Excel file to begin configuring column roles.', + tableColumnRolesReparseTip: + 'Re-parse existing documents for the new column roles to take effect.', parserLabel: { naive: 'General', qa: 'Q&A', diff --git a/web/src/pages/dataset/dataset-setting/configuration/table.tsx b/web/src/pages/dataset/dataset-setting/configuration/table.tsx index ecf9fc7cc2..40febbf0e4 100644 --- a/web/src/pages/dataset/dataset-setting/configuration/table.tsx +++ b/web/src/pages/dataset/dataset-setting/configuration/table.tsx @@ -1,12 +1,155 @@ +import { FormControl, FormItem, FormLabel } from '@/components/ui/form'; +import { RadioGroup, RadioGroupItem } from '@/components/ui/radio-group'; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from '@/components/ui/select'; +import { useTranslate } from '@/hooks/common-hooks'; +import { useFormContext, useWatch } from 'react-hook-form'; import { ConfigurationFormContainer } from '../configuration-form-container'; +const ROLE_OPTIONS = [ + { value: 'both', labelKey: 'tableColumnRoleBoth' }, + { value: 'indexing', labelKey: 'tableColumnRoleIndexing' }, + { value: 'metadata', labelKey: 'tableColumnRoleMetadata' }, +] as const; + +function selectTableColumnRoleValue(raw: string | undefined): string { + if (!raw) return 'both'; + return raw === 'vectorize' ? 'indexing' : raw; +} + export function TableConfiguration() { + const form = useFormContext(); + const { t } = useTranslate('knowledgeConfiguration'); + + const tableColumnMode = useWatch({ + control: form.control, + name: 'parser_config.table_column_mode', + defaultValue: 'auto', + }); + const tableColumnNames = useWatch({ + control: form.control, + name: 'parser_config.table_column_names', + defaultValue: [], + }); + const tableColumnRoles = useWatch({ + control: form.control, + name: 'parser_config.table_column_roles', + defaultValue: {}, + }); + + const mode = tableColumnMode === 'manual' ? 'manual' : 'auto'; + const columns: string[] = Array.isArray(tableColumnNames) + ? tableColumnNames + : []; + + const handleModeChange = (value: string) => { + form.setValue( + 'parser_config.table_column_mode', + value as 'auto' | 'manual', + ); + }; + + const handleRoleChange = (columnName: string, role: string) => { + const current = + (form.getValues('parser_config.table_column_roles') as Record< + string, + string + >) || {}; + form.setValue('parser_config.table_column_roles', { + ...current, + [columnName]: role, + }); + }; + return ( - {/* - + + + {t('tableColumnMode')} + + + +
+ + +
+
+ + +
+
+
+
- */} + {mode === 'auto' && ( +

+ {t('tableColumnModeAutoDescription')} +

+ )} + + {mode === 'manual' && columns.length === 0 && ( +

+ {t('tableColumnRolesEmpty')} +

+ )} + + {mode === 'manual' && columns.length > 0 && ( + <> +

+ {t('tableColumnRolesTip')} +

+
+ {columns.map((col) => ( + + + {col} + + + + + + ))} +
+

+ {t('tableColumnRolesReparseTip')} +

+ + )}
); } diff --git a/web/src/pages/dataset/dataset-setting/form-schema.ts b/web/src/pages/dataset/dataset-setting/form-schema.ts index 18801349da..7aef591f07 100644 --- a/web/src/pages/dataset/dataset-setting/form-schema.ts +++ b/web/src/pages/dataset/dataset-setting/form-schema.ts @@ -94,6 +94,18 @@ export const formSchema = z .optional(), enable_metadata: z.boolean().optional(), llm_id: z.string().optional(), + // Table parser: "auto" = all columns both, "manual" = use column role selector + table_column_mode: z.enum(['auto', 'manual']).optional(), + // Table parser: column name -> role (indexing | metadata | both); legacy "vectorize" -> indexing + table_column_roles: z + .record( + z + .enum(['indexing', 'metadata', 'both', 'vectorize']) + .transform((role) => (role === 'vectorize' ? 'indexing' : role)), + ) + .optional(), + // Table parser: column names list (set by backend after first parse) + table_column_names: z.array(z.string()).optional(), }) .optional(), pagerank: z.number(),