mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
Fix: ExeSQL node continues on per-statement SQL errors (#15140)
Wrap per-statement execution in both the generic and IBM DB2 loops so a failing statement reports a friendly "SQL Execution Failed" message and continues, instead of letting a raw driver exception abort the node and discard results from statements that already succeeded. Rolls back after a failure so PostgreSQL's aborted-transaction state does not cascade into every subsequent statement in the batch. ### What problem does this PR solve? Closes #14737 The **ExeSQL** agent node splits its input on `;` and runs each statement in a loop. Both execution loops — the generic one (`cursor.execute`) and the IBM DB2 one (`ibm_db.exec_immediate`) — were wrapped only in a `try/finally` for resource cleanup, with **no `except`** around statement execution. As a result, when any single statement failed (e.g. the reporter's MSSQL `('42S02', "[42S02] ... 对象名 'ASSET_AUDIT' 无效")`): - The raw, unformatted driver exception bubbled up and the node failed with an ugly `_ERROR` instead of friendly information. - **The whole node aborted** — results from statements that had already succeeded were discarded, and the remaining statements in the batch never ran. The reporter confirmed this was the real pain point: *"after reporting an exception, the previous normal query cannot be executed properly … Do not interrupt the workflow for any issues."* Connection-level failures were already wrapped with a friendly `"Database Connection Failed!"` prefix — only per-statement execution errors were missed. **This PR** wraps per-statement execution in `try/except` in both loops. A failing statement now: - records a friendly `SQL Execution Failed: <sql>\n<error>` entry into the `json` and `formalized_content` outputs (the actual DB error is kept so the user can see *what* failed), and - `continue`s to the next statement — so earlier results survive and later statements still run. After a failure in the generic loop, the connection is rolled back so PostgreSQL's aborted-transaction state does not cascade into every subsequent statement in the batch. The node returns normally (no `_ERROR` raised), so the agent workflow proceeds instead of halting. Connection failures remain fatal (correct — nothing can run without a connection). The pre-existing `break` on `cursor.rowcount == 0` is intentionally left unchanged; it is out of scope for this fix. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@@ -208,28 +208,37 @@ class ExeSQL(ToolBase, ABC):
|
||||
continue
|
||||
single_sql = re.sub(r"\[ID:[0-9]+\]", "", single_sql)
|
||||
|
||||
stmt = ibm_db.exec_immediate(conn, single_sql)
|
||||
rows = []
|
||||
row = ibm_db.fetch_assoc(stmt)
|
||||
while row and len(rows) < self._param.max_records:
|
||||
if self.check_if_canceled("ExeSQL processing"):
|
||||
return
|
||||
rows.append(row)
|
||||
try:
|
||||
stmt = ibm_db.exec_immediate(conn, single_sql)
|
||||
rows = []
|
||||
row = ibm_db.fetch_assoc(stmt)
|
||||
while row and len(rows) < self._param.max_records:
|
||||
if self.check_if_canceled("ExeSQL processing"):
|
||||
return
|
||||
rows.append(row)
|
||||
row = ibm_db.fetch_assoc(stmt)
|
||||
|
||||
if not rows:
|
||||
sql_res.append({"content": "No record in the database!"})
|
||||
if not rows:
|
||||
sql_res.append({"content": "No record in the database!"})
|
||||
continue
|
||||
|
||||
df = pd.DataFrame(rows)
|
||||
for col in df.columns:
|
||||
if pd.api.types.is_datetime64_any_dtype(df[col]):
|
||||
df[col] = df[col].dt.strftime("%Y-%m-%d")
|
||||
|
||||
df = df.where(pd.notnull(df), None)
|
||||
|
||||
sql_res.append(convert_decimals(df.to_dict(orient="records")))
|
||||
formalized_content.append(df.to_markdown(index=False, floatfmt=".6f"))
|
||||
except Exception as e:
|
||||
# Keep the node alive on a bad statement: report and continue.
|
||||
with contextlib.suppress(Exception):
|
||||
ibm_db.rollback(conn)
|
||||
msg = f"SQL Execution Failed: {single_sql}\n{str(e)}"
|
||||
sql_res.append({"content": msg})
|
||||
formalized_content.append(msg)
|
||||
continue
|
||||
|
||||
df = pd.DataFrame(rows)
|
||||
for col in df.columns:
|
||||
if pd.api.types.is_datetime64_any_dtype(df[col]):
|
||||
df[col] = df[col].dt.strftime("%Y-%m-%d")
|
||||
|
||||
df = df.where(pd.notnull(df), None)
|
||||
|
||||
sql_res.append(convert_decimals(df.to_dict(orient="records")))
|
||||
formalized_content.append(df.to_markdown(index=False, floatfmt=".6f"))
|
||||
finally:
|
||||
with contextlib.suppress(Exception):
|
||||
ibm_db.close(conn)
|
||||
@@ -259,25 +268,37 @@ class ExeSQL(ToolBase, ABC):
|
||||
sql_res.append({"content": "For security reasons, INSERT, UPDATE, and DELETE statements are not supported."})
|
||||
formalized_content.append("For security reasons, INSERT, UPDATE, and DELETE statements are not supported.")
|
||||
continue
|
||||
cursor.execute(single_sql)
|
||||
if cursor.rowcount == 0:
|
||||
sql_res.append({"content": "No record in the database!"})
|
||||
break
|
||||
if self._param.db_type == 'mssql':
|
||||
single_res = pd.DataFrame.from_records(cursor.fetchmany(self._param.max_records),
|
||||
columns=[desc[0] for desc in cursor.description])
|
||||
else:
|
||||
single_res = pd.DataFrame([i for i in cursor.fetchmany(self._param.max_records)])
|
||||
single_res.columns = [i[0] for i in cursor.description]
|
||||
try:
|
||||
cursor.execute(single_sql)
|
||||
if cursor.rowcount == 0:
|
||||
sql_res.append({"content": "No record in the database!"})
|
||||
break
|
||||
if self._param.db_type == 'mssql':
|
||||
single_res = pd.DataFrame.from_records(cursor.fetchmany(self._param.max_records),
|
||||
columns=[desc[0] for desc in cursor.description])
|
||||
else:
|
||||
single_res = pd.DataFrame([i for i in cursor.fetchmany(self._param.max_records)])
|
||||
single_res.columns = [i[0] for i in cursor.description]
|
||||
|
||||
for col in single_res.columns:
|
||||
if pd.api.types.is_datetime64_any_dtype(single_res[col]):
|
||||
single_res[col] = single_res[col].dt.strftime('%Y-%m-%d')
|
||||
for col in single_res.columns:
|
||||
if pd.api.types.is_datetime64_any_dtype(single_res[col]):
|
||||
single_res[col] = single_res[col].dt.strftime('%Y-%m-%d')
|
||||
|
||||
single_res = single_res.where(pd.notnull(single_res), None)
|
||||
single_res = single_res.where(pd.notnull(single_res), None)
|
||||
|
||||
sql_res.append(convert_decimals(single_res.to_dict(orient='records')))
|
||||
formalized_content.append(single_res.to_markdown(index=False, floatfmt=".6f"))
|
||||
sql_res.append(convert_decimals(single_res.to_dict(orient='records')))
|
||||
formalized_content.append(single_res.to_markdown(index=False, floatfmt=".6f"))
|
||||
except Exception as e:
|
||||
# A failing statement must not abort the node: report it and keep
|
||||
# going so earlier results survive and later statements still run.
|
||||
# The rollback clears PostgreSQL's aborted-transaction state, which
|
||||
# would otherwise make every subsequent statement fail too.
|
||||
with contextlib.suppress(Exception):
|
||||
db.rollback()
|
||||
msg = f"SQL Execution Failed: {single_sql}\n{str(e)}"
|
||||
sql_res.append({"content": msg})
|
||||
formalized_content.append(msg)
|
||||
continue
|
||||
finally:
|
||||
with contextlib.suppress(Exception):
|
||||
cursor.close()
|
||||
|
||||
Reference in New Issue
Block a user