Files
ragflow/common/doc_store/es_conn_base.py
Rene Arredondo 3f929e3904 fix(es): downgrade LLM-generated invalid SQL to WARNING in ES sql() (#15409) (#15709)
## Summary

Fixes #15409.

Reporter sees scary ERROR-level stack traces in `ragflow_server.log` on
every chat turn against a knowledge base whose spreadsheet has many
columns with embedded IDs (e.g. `id-wstc-bios fvt-322-wstc-bios
fvt-323`). Simple queries work; complex ones return "No answer" with
logs that look like a hard crash.

### What's actually happening

1. The user uploads a wide Excel/CSV.
[rag/app/table.py:477-493](rag/app/table.py#L477-L493) turns each header
into an ES field with a type suffix, e.g. `id-wstc-bios
fvt-322-wstc-bios fvt-323_tks`. This is correct — the parser faithfully
encodes the user's column names.
2. The user asks about test case `fvt-085`. The SQL chat path in
[api/db/services/dialog_service.py:914
use_sql](api/db/services/dialog_service.py#L914) asks the LLM to write
SQL using the field list. The LLM sees the `id-wstc-bios
fvt-NNN-wstc-bios fvt-MMM_tks` pattern and pattern-completes a
plausible-but-nonexistent column.
3. Elasticsearch rejects with `BadRequestError(400,
'verification_exception')`: `Unknown column [id-wstc-bios
fvt-085-wstc-bios fvt-086_tks]` and suggests the closest valid column.
4. **The recovery path already exists**: `use_sql` catches the
exception, re-prompts the LLM with the error text (which contains ES's
"did you mean" hint), and on second failure the caller at
[api/db/services/dialog_service.py:626](api/db/services/dialog_service.py#L626)
falls back to vector search. The chat does produce an answer — it's just
generated from the vector hits instead of SQL.

The only real bug is logging:

-
[common/doc_store/es_conn_base.py:399](common/doc_store/es_conn_base.py#L399)
catches every exception with `self.logger.exception(...)`, which writes
a full traceback at **ERROR** level.
- For LLM-generated SQL this is the hot path, not an exceptional
condition — it can fire twice per turn before the fallback runs.

### Fix

Catch `elasticsearch.BadRequestError` (the parent class of
`verification_exception` / `parsing_exception` / similar SQL-validity
errors) separately and log it at **WARNING** with the SQL plus ES error
message. The message still carries the unknown column name and ES's
suggested alternative, so it's actionable for anyone investigating "why
is my LLM producing bad SQL?" — just without the misleading stack trace.

Other exception types (`ConnectionTimeout`, generic `Exception`) keep
their original `ERROR`-level traceback treatment; those represent real
connectivity / library bugs.

This is a one-file, two-line-net change. The retry loop in `use_sql`,
the `add_kb_filter` injection, and the vector-search fallback are all
unchanged.

### What this PR does NOT change

- **The LLM prompts in `use_sql`** — they already specify `Use EXACT
field names from the schema` and pass the field list explicitly.
Strengthening them risks regressing well-behaved cases and is out of
scope for #15409.
- **The single-retry policy** — extending it to multi-retry with
extracted ES suggestions is a separate enhancement.
- **The parser at `rag/app/table.py`** — the field names match the
user's actual column headers; the parser is doing its job.

## Files changed

- [common/doc_store/es_conn_base.py](common/doc_store/es_conn_base.py)
  - Add `BadRequestError` to the `elasticsearch` import.
- In `ESConnectionBase.sql()`, add an `except BadRequestError` arm above
the generic `except Exception` that logs at WARNING and re-raises (so
`use_sql` retry/fallback still triggers).
2026-06-11 15:04:52 +08:00

411 lines
16 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import re
import json
import time
import os
from abc import abstractmethod
from elasticsearch import BadRequestError, NotFoundError
from elasticsearch_dsl import Index
from elastic_transport import ConnectionTimeout
from elasticsearch.client import IndicesClient
from common.file_utils import get_project_base_directory
from common.misc_utils import convert_bytes
from common.doc_store.doc_store_base import DocStoreConnection, OrderByExpr, MatchExpr
from rag.nlp import is_english, rag_tokenizer
from common import settings
ATTEMPT_TIME = 2
class ESConnectionBase(DocStoreConnection):
def __init__(self, mapping_file_name: str="mapping.json", logger_name: str='ragflow.es_conn'):
from common.doc_store.es_conn_pool import ES_CONN
self.logger = logging.getLogger(logger_name)
self.info = {}
self.logger.info(f"Use Elasticsearch {settings.ES['hosts']} as the doc engine.")
self.es = ES_CONN.get_conn()
fp_mapping = os.path.join(get_project_base_directory(), "conf", mapping_file_name)
if not os.path.exists(fp_mapping):
msg = f"Elasticsearch mapping file not found at {fp_mapping}"
self.logger.error(msg)
raise Exception(msg)
with open(fp_mapping, "r") as f:
self.mapping = json.load(f)
self.logger.info(f"Elasticsearch {settings.ES['hosts']} is healthy.")
def _connect(self):
from common.doc_store.es_conn_pool import ES_CONN
if self.es.ping():
return True
self.es = ES_CONN.refresh_conn()
return True
"""
Database operations
"""
def db_type(self) -> str:
return "elasticsearch"
def health(self) -> dict:
health_dict = dict(self.es.cluster.health())
health_dict["type"] = "elasticsearch"
return health_dict
def get_cluster_stats(self):
"""
curl -XGET "http://{es_host}/_cluster/stats" -H "kbn-xsrf: reporting" to view raw stats.
"""
raw_stats = self.es.cluster.stats()
self.logger.debug(f"ESConnection.get_cluster_stats: {raw_stats}")
try:
res = {
'cluster_name': raw_stats['cluster_name'],
'status': raw_stats['status']
}
indices_status = raw_stats['indices']
res.update({
'indices': indices_status['count'],
'indices_shards': indices_status['shards']['total']
})
doc_info = indices_status['docs']
res.update({
'docs': doc_info['count'],
'docs_deleted': doc_info['deleted']
})
store_info = indices_status['store']
res.update({
'store_size': convert_bytes(store_info['size_in_bytes']),
'total_dataset_size': convert_bytes(store_info['total_data_set_size_in_bytes'])
})
mappings_info = indices_status['mappings']
res.update({
'mappings_fields': mappings_info['total_field_count'],
'mappings_deduplicated_fields': mappings_info['total_deduplicated_field_count'],
'mappings_deduplicated_size': convert_bytes(mappings_info['total_deduplicated_mapping_size_in_bytes'])
})
node_info = raw_stats['nodes']
res.update({
'nodes': node_info['count']['total'],
'nodes_version': node_info['versions'],
'os_mem': convert_bytes(node_info['os']['mem']['total_in_bytes']),
'os_mem_used': convert_bytes(node_info['os']['mem']['used_in_bytes']),
'os_mem_used_percent': node_info['os']['mem']['used_percent'],
'jvm_versions': node_info['jvm']['versions'][0]['vm_version'],
'jvm_heap_used': convert_bytes(node_info['jvm']['mem']['heap_used_in_bytes']),
'jvm_heap_max': convert_bytes(node_info['jvm']['mem']['heap_max_in_bytes'])
})
return res
except Exception as e:
self.logger.exception(f"ESConnection.get_cluster_stats: {e}")
return None
"""
Table operations
"""
def create_idx(self, index_name: str, dataset_id: str, vector_size: int, parser_id: str = None):
# parser_id is used by Infinity but not needed for ES (kept for interface compatibility)
if self.index_exist(index_name, dataset_id):
return True
try:
return IndicesClient(self.es).create(index=index_name,
settings=self.mapping["settings"],
mappings=self.mapping["mappings"])
except Exception:
self.logger.exception("ESConnection.createIndex error %s" % index_name)
def create_doc_meta_idx(self, index_name: str):
"""
Create a document metadata index.
Index name pattern: ragflow_doc_meta_{tenant_id}
- Per-tenant metadata index for storing document metadata fields
"""
if self.index_exist(index_name, ""):
return True
try:
fp_mapping = os.path.join(get_project_base_directory(), "conf", "doc_meta_es_mapping.json")
if not os.path.exists(fp_mapping):
self.logger.error(f"Document metadata mapping file not found at {fp_mapping}")
return False
with open(fp_mapping, "r") as f:
doc_meta_mapping = json.load(f)
return IndicesClient(self.es).create(index=index_name,
settings=doc_meta_mapping["settings"],
mappings=doc_meta_mapping["mappings"])
except Exception as e:
self.logger.exception(f"Error creating document metadata index {index_name}: {e}")
def refresh_idx(self, index_name: str) -> bool:
"""
Refresh an index so that recently inserted documents become searchable.
Service layers should call this dispatch method instead of reaching
into ``self.es`` directly, so the OpenSearch and Elasticsearch
connections present a uniform abstract API.
"""
try:
self.es.indices.refresh(index=index_name)
return True
except NotFoundError:
return False
except Exception as e:
self.logger.warning(f"ESConnection.refresh_idx({index_name}) failed: {e}")
return False
def count_idx(self, index_name: str) -> int:
"""
Return the document count for an index, or -1 if the call fails.
Used to decide whether a per-tenant metadata index is empty without
paying a full search.
"""
try:
response = self.es.count(index=index_name)
return int(response.get("count", 0))
except NotFoundError:
return 0
except Exception as e:
self.logger.warning(f"ESConnection.count_idx({index_name}) failed: {e}")
return -1
def replace_meta_fields(self, index_name: str, doc_id: str, meta_fields: dict) -> bool:
"""
Fully replace the ``meta_fields`` object on a single document.
Using ES.update with a ``doc`` body would deep-merge object fields,
retaining old keys that should be removed. A scripted update assigns
the new meta_fields outright, matching delete-key semantics.
"""
body = {
"script": {
"source": "ctx._source.meta_fields = params.meta_fields",
"params": {"meta_fields": meta_fields},
}
}
try:
self.es.update(index=index_name, id=doc_id, refresh=True, body=body)
return True
except NotFoundError:
return False
except Exception as e:
self.logger.warning(f"ESConnection.replace_meta_fields({index_name}, {doc_id}) failed: {e}")
return False
def delete_idx(self, index_name: str, dataset_id: str):
if len(dataset_id) > 0:
# The index need to be alive after any kb deletion since all kb under this tenant are in one index.
return
try:
self.es.indices.delete(index=index_name, allow_no_indices=True)
except NotFoundError:
pass
except Exception:
self.logger.exception("ESConnection.deleteIdx error %s" % index_name)
def index_exist(self, index_name: str, dataset_id: str = None) -> bool:
s = Index(index_name, self.es)
for i in range(ATTEMPT_TIME):
try:
return s.exists()
except ConnectionTimeout:
self.logger.exception("ES request timeout")
time.sleep(3)
self._connect()
continue
except Exception as e:
self.logger.exception(e)
break
return False
"""
CRUD operations
"""
def get(self, doc_id: str, index_name: str, dataset_ids: list[str]) -> dict | None:
for i in range(ATTEMPT_TIME):
try:
res = self.es.get(index=index_name,
id=doc_id, source=True, )
if str(res.get("timed_out", "")).lower() == "true":
raise Exception("Es Timeout.")
doc = res["_source"]
doc["id"] = doc_id
return doc
except NotFoundError:
return None
except Exception as e:
self.logger.exception(f"ESConnection.get({doc_id}) got exception")
raise e
self.logger.error(f"ESConnection.get timeout for {ATTEMPT_TIME} times!")
raise Exception("ESConnection.get timeout.")
@abstractmethod
def search(
self, select_fields: list[str],
highlight_fields: list[str],
condition: dict,
match_expressions: list[MatchExpr],
order_by: OrderByExpr,
offset: int,
limit: int,
index_names: str | list[str],
dataset_ids: list[str],
agg_fields: list[str] | None = None,
rank_feature: dict | None = None
):
raise NotImplementedError("Not implemented")
@abstractmethod
def insert(self, documents: list[dict], index_name: str, dataset_id: str = None) -> list[str]:
raise NotImplementedError("Not implemented")
@abstractmethod
def update(self, condition: dict, new_value: dict, index_name: str, dataset_id: str) -> bool:
raise NotImplementedError("Not implemented")
@abstractmethod
def delete(self, condition: dict, index_name: str, dataset_id: str) -> int:
raise NotImplementedError("Not implemented")
"""
Helper functions for search result
"""
def get_total(self, res):
if isinstance(res["hits"]["total"], type({})):
return res["hits"]["total"]["value"]
return res["hits"]["total"]
def get_doc_ids(self, res):
return [d["_id"] for d in res["hits"]["hits"]]
def get_scores(self, res) -> dict[str, float]:
"""
Map hit `_id` to its raw `_score`. Used to recover the cosine
similarity returned by a KNN-only search without pulling the
chunk vectors out of the index.
"""
out = {}
for d in res.get("hits", {}).get("hits", []):
doc_id = d.get("_id")
if doc_id is None:
continue
score = d.get("_score")
out[doc_id] = float(score) if score is not None else 0.0
return out
def _get_source(self, res):
rr = []
for d in res["hits"]["hits"]:
d["_source"]["id"] = d["_id"]
d["_source"]["_score"] = d["_score"]
rr.append(d["_source"])
return rr
@abstractmethod
def get_fields(self, res, fields: list[str]) -> dict[str, dict]:
raise NotImplementedError("Not implemented")
def get_highlight(self, res, keywords: list[str], field_name: str):
ans = {}
for d in res["hits"]["hits"]:
highlights = d.get("highlight")
if not highlights:
continue
txt = "...".join([a for a in list(highlights.items())[0][1]])
if not is_english(txt.split()):
ans[d["_id"]] = txt
continue
txt = d["_source"][field_name]
txt = re.sub(r"[\r\n]", " ", txt, flags=re.IGNORECASE | re.MULTILINE)
txt_list = []
for t in re.split(r"[.?!;\n]", txt):
for w in keywords:
t = re.sub(r"(^|[ .?/'\"\(\)!,:;-])(%s)([ .?/'\"\(\)!,:;-])" % re.escape(w), r"\1<em>\2</em>\3", t,
flags=re.IGNORECASE | re.MULTILINE)
if not re.search(r"<em>[^<>]+</em>", t, flags=re.IGNORECASE | re.MULTILINE):
continue
txt_list.append(t)
ans[d["_id"]] = "...".join(txt_list) if txt_list else "...".join([a for a in list(highlights.items())[0][1]])
return ans
def get_aggregation(self, res, field_name: str):
agg_field = "aggs_" + field_name
if "aggregations" not in res or agg_field not in res["aggregations"]:
return list()
buckets = res["aggregations"][agg_field]["buckets"]
return [(b["key"], b["doc_count"]) for b in buckets]
"""
SQL
"""
def sql(self, sql: str, fetch_size: int, format: str):
self.logger.debug(f"ESConnection.sql get sql: {sql}")
sql = re.sub(r"[ `]+", " ", sql)
sql = sql.replace("%", "")
replaces = []
for r in re.finditer(r" ([a-z_]+_l?tks)( like | ?= ?)'([^']+)'", sql):
fld, v = r.group(1), r.group(3)
match = " MATCH({}, '{}', 'operator=OR;minimum_should_match=30%') ".format(
fld, rag_tokenizer.fine_grained_tokenize(rag_tokenizer.tokenize(v)))
replaces.append(
("{}{}'{}'".format(
r.group(1),
r.group(2),
r.group(3)),
match))
for p, r in replaces:
sql = sql.replace(p, r, 1)
self.logger.debug(f"ESConnection.sql to es: {sql}")
for i in range(ATTEMPT_TIME):
try:
res = self.es.sql.query(body={"query": sql, "fetch_size": fetch_size}, format=format,
request_timeout="2s")
return res
except ConnectionTimeout:
self.logger.exception("ES request timeout")
time.sleep(3)
self._connect()
continue
except BadRequestError as e:
# LLM-generated SQL routinely references columns that don't exist
# (e.g. unknown_column / verification_exception). The caller in
# api/db/services/dialog_service.py:use_sql catches this and either
# re-prompts the LLM with the error or falls back to vector search,
# so a full ERROR-level traceback is misleading — see #15409.
self.logger.warning(f"ESConnection.sql rejected by ES (likely invalid LLM-generated SQL). SQL:\n{sql}\nError: {e}")
raise Exception(f"SQL error: {e}\n\nSQL: {sql}")
except Exception as e:
self.logger.exception(f"ESConnection.sql got exception. SQL:\n{sql}")
raise Exception(f"SQL error: {e}\n\nSQL: {sql}")
self.logger.error(f"ESConnection.sql timeout for {ATTEMPT_TIME} times!")
return None