feat(agent): add module-level debug logging for canvas execution flow (#16200)

Summary

Add module-level debug logging to track Agent canvas execution flow
(Closes #9306), enabling developers to diagnose component invocation,
input/output states, and variable resolution without modifying
production code.

Also fix related bugs in message.py: re.sub backreference issue and
unawaited _save_to_memory coroutine causing silent memory save failures.

Changes

agent/canvas.py: log workflow start, component invocation, and component
completion
agent/component/agent_with_tools.py: log Agent parameter resolution and
LLM invocation path; standardize json.dumps usage
agent/component/base.py: log get_input() variable resolution branches
agent/component/message.py: fix re.sub backreference issue; properly
await _save_to_memory coroutine

Design

Uses module-level loggers (logging.getLogger(__name__)) to support
selective debugging: LOG_LEVELS=agent=DEBUG
Zero performance impact in production (INFO level by default)
Works with existing PUT /system/config/log API for runtime level changes

Closes #9306

Note: While adding debug logging, I discovered and fixed two related
bugs in message.py:
- re.sub replacement value was interpreted as regex backreference
instead of literal string
- _save_to_memory coroutine was not properly awaited, causing silent
failures

---------

Co-authored-by: wills <willsgao@163.com>
This commit is contained in:
Willsgao
2026-06-29 09:41:16 +08:00
committed by yzc
parent dc07b6ca8f
commit 78db4e949b
4 changed files with 61 additions and 5 deletions

View File

@@ -40,6 +40,8 @@ from rag.prompts.generator import chunks_format
from rag.utils.redis_conn import REDIS_CONN from rag.utils.redis_conn import REDIS_CONN
from rag.utils.tts_cache import synthesize_with_cache from rag.utils.tts_cache import synthesize_with_cache
_logger = logging.getLogger(__name__)
class Graph: class Graph:
""" """
dsl = { dsl = {
@@ -438,6 +440,11 @@ class Canvas(Graph):
if not is_resume: if not is_resume:
yield decorate("workflow_started", {"inputs": kwargs.get("inputs")}) yield decorate("workflow_started", {"inputs": kwargs.get("inputs")})
_logger.debug(
"[Canvas] Workflow started. Path: %s, Inputs: %s",
[self.get_component_name(c) for c in self.path],
json.dumps(kwargs.get("inputs", {}), ensure_ascii=False, default=str)[:500],
)
self.retrieval.append({"chunks": {}, "doc_aggs": {}}) self.retrieval.append({"chunks": {}, "doc_aggs": {}})
async def _run_batch(f, t): async def _run_batch(f, t):
@@ -482,6 +489,13 @@ class Canvas(Graph):
if task_fn is None: if task_fn is None:
continue continue
_logger.debug(
"[Canvas] Invoking component '%s' (%s) with inputs: %s",
self.get_component_name(self.path[i - 1]),
cpn.component_name,
json.dumps(call_kwargs, ensure_ascii=False, default=str)[:500],
)
fn_invoke_async = getattr(cpn, "_invoke_async", None) fn_invoke_async = getattr(cpn, "_invoke_async", None)
use_async = (fn_invoke_async and asyncio.iscoroutinefunction(fn_invoke_async)) or asyncio.iscoroutinefunction(getattr(cpn, "_invoke", None)) use_async = (fn_invoke_async and asyncio.iscoroutinefunction(fn_invoke_async)) or asyncio.iscoroutinefunction(getattr(cpn, "_invoke", None))
tasks.append(asyncio.create_task(_invoke_one(cpn, task_fn, call_kwargs, use_async))) tasks.append(asyncio.create_task(_invoke_one(cpn, task_fn, call_kwargs, use_async)))
@@ -490,9 +504,17 @@ class Canvas(Graph):
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
def _node_finished(cpn_obj): def _node_finished(cpn_obj):
outputs = cpn_obj.output()
_logger.debug(
"[Canvas] Component '%s' (%s) finished. Outputs: %s, Error: %s",
self.get_component_name(cpn_obj._id),
self.get_component_type(cpn_obj._id),
json.dumps(outputs, ensure_ascii=False, default=str)[:500],
cpn_obj.error(),
)
return decorate("node_finished",{ return decorate("node_finished",{
"inputs": cpn_obj.get_input_values(), "inputs": cpn_obj.get_input_values(),
"outputs": cpn_obj.output(), "outputs": outputs,
"component_id": cpn_obj._id, "component_id": cpn_obj._id,
"component_name": self.get_component_name(cpn_obj._id), "component_name": self.get_component_name(cpn_obj._id),
"component_type": self.get_component_type(cpn_obj._id), "component_type": self.get_component_type(cpn_obj._id),

View File

@@ -34,6 +34,8 @@ from common.connection_utils import timeout
from common.mcp_tool_call_conn import MCPToolBinding, MCPToolCallSession, mcp_tool_metadata_to_openai_tool from common.mcp_tool_call_conn import MCPToolBinding, MCPToolCallSession, mcp_tool_metadata_to_openai_tool
from rag.prompts.generator import citation_plus, citation_prompt, full_question, kb_prompt, message_fit_in, structured_output_prompt from rag.prompts.generator import citation_plus, citation_prompt, full_question, kb_prompt, message_fit_in, structured_output_prompt
_logger = logging.getLogger(__name__)
class AgentParam(LLMParam, ToolParamBase): class AgentParam(LLMParam, ToolParamBase):
""" """
@@ -194,6 +196,14 @@ class Agent(LLM, ToolBase):
if self.check_if_canceled("Agent processing"): if self.check_if_canceled("Agent processing"):
return return
_logger.debug(
"[Agent] _invoke_async called. Component: %s, Keys in kwargs: %s, user_prompt: %s, tools count: %d",
self._id,
list(kwargs.keys()),
json.dumps(kwargs.get("user_prompt", ""), ensure_ascii=False, default=str)[:300],
len(self.tools) if self.tools else 0,
)
if kwargs.get("user_prompt"): if kwargs.get("user_prompt"):
usr_pmt = "" usr_pmt = ""
if kwargs.get("reasoning"): if kwargs.get("reasoning"):
@@ -205,10 +215,13 @@ class Agent(LLM, ToolBase):
else: else:
usr_pmt = str(kwargs["user_prompt"]) usr_pmt = str(kwargs["user_prompt"])
self._param.prompts = [{"role": "user", "content": usr_pmt}] self._param.prompts = [{"role": "user", "content": usr_pmt}]
_logger.debug("[Agent] Built user prompt with length=%d, reasoning=%s, context=%s",
len(usr_pmt), bool(kwargs.get("reasoning")), bool(kwargs.get("context")))
if not self.tools: if not self.tools:
if self.check_if_canceled("Agent processing"): if self.check_if_canceled("Agent processing"):
return return
_logger.debug("[Agent] No tools configured. Delegating to LLM._invoke_async. prompt_count=%d", len(self._param.prompts) if self._param.prompts else 0)
return await LLM._invoke_async(self, **kwargs) return await LLM._invoke_async(self, **kwargs)
prompt, msg, user_defined_prompt = self._prepare_prompt_variables() prompt, msg, user_defined_prompt = self._prepare_prompt_variables()
@@ -223,11 +236,13 @@ class Agent(LLM, ToolBase):
ex = self.exception_handler() ex = self.exception_handler()
has_message_downstream = any(self._canvas.get_component_obj(cid).component_name.lower() == "message" for cid in downstreams) has_message_downstream = any(self._canvas.get_component_obj(cid).component_name.lower() == "message" for cid in downstreams)
if has_message_downstream and not (ex and ex["goto"]) and not output_schema: if has_message_downstream and not (ex and ex["goto"]) and not output_schema:
_logger.debug("[Agent] Entering streaming mode (has message downstream)")
self.set_output("content", partial(self.stream_output_with_tools_async, prompt, deepcopy(msg), user_defined_prompt)) self.set_output("content", partial(self.stream_output_with_tools_async, prompt, deepcopy(msg), user_defined_prompt))
return return
msg = self._fit_messages(prompt, msg) msg = self._fit_messages(prompt, msg)
self._append_system_prompt(msg, schema_prompt) self._append_system_prompt(msg, schema_prompt)
_logger.debug("[Agent] Calling LLM with %d messages, has_schema=%s", len(msg), bool(schema_prompt))
ans = await self._generate_async(msg) ans = await self._generate_async(msg)
if ans.find("**ERROR**") >= 0: if ans.find("**ERROR**") >= 0:
@@ -257,6 +272,7 @@ class Agent(LLM, ToolBase):
artifact_md = self._collect_tool_artifact_markdown(existing_text=ans) artifact_md = self._collect_tool_artifact_markdown(existing_text=ans)
if artifact_md: if artifact_md:
ans += "\n\n" + artifact_md ans += "\n\n" + artifact_md
_logger.debug("[Agent] Final output. content_length=%d, has_artifact=%s", len(ans), bool(artifact_md))
self.set_output("content", ans) self.set_output("content", ans)
return ans return ans

View File

@@ -31,6 +31,8 @@ from common.connection_utils import timeout
from common.misc_utils import thread_pool_exec from common.misc_utils import thread_pool_exec
_logger = logging.getLogger(__name__)
_FEEDED_DEPRECATED_PARAMS = "_feeded_deprecated_params" _FEEDED_DEPRECATED_PARAMS = "_feeded_deprecated_params"
_DEPRECATED_PARAMS = "_deprecated_params" _DEPRECATED_PARAMS = "_deprecated_params"
_USER_FEEDED_PARAMS = "_user_feeded_params" _USER_FEEDED_PARAMS = "_user_feeded_params"
@@ -481,18 +483,28 @@ class ComponentBase(ABC):
return self._param.inputs.get(key, {}).get("value") return self._param.inputs.get(key, {}).get("value")
res = {} res = {}
for var, o in self.get_input_elements().items(): input_elements = self.get_input_elements()
_logger.debug(
"[Base] Component '%s' (%s) resolving inputs. Input element keys: %s",
self._id, self.component_name, list(input_elements.keys()),
)
for var, o in input_elements.items():
v = self.get_param(var) v = self.get_param(var)
if v is None: if v is None:
_logger.debug("[Base] var '%s': param is None, skipping", var)
continue continue
if isinstance(v, str) and self._canvas.is_reff(v): if isinstance(v, str) and self._canvas.is_reff(v):
self.set_input_value(var, self._canvas.get_variable_value(v)) resolved = self._canvas.get_variable_value(v)
self.set_input_value(var, resolved)
_logger.debug("[Base] var '%s': resolved ref '%s' -> %s", var, v, json.dumps(resolved, ensure_ascii=False, default=str)[:200])
elif isinstance(v, str) and re.search(self.variable_ref_patt, v): elif isinstance(v, str) and re.search(self.variable_ref_patt, v):
elements = self.get_input_elements_from_text(v) elements = self.get_input_elements_from_text(v)
kv = {k: e.get('value', '') for k, e in elements.items()} kv = {k: e.get('value', '') for k, e in elements.items()}
self.set_input_value(var, self.string_format(v, kv)) self.set_input_value(var, self.string_format(v, kv))
_logger.debug("[Base] var '%s': resolved text refs '%s' -> %s", var, v, json.dumps(kv, ensure_ascii=False, default=str)[:200])
else: else:
self.set_input_value(var, v) self.set_input_value(var, v)
_logger.debug("[Base] var '%s': literal value -> %s", var, json.dumps(v, ensure_ascii=False, default=str)[:200])
res[var] = self.get_input_value(var) res[var] = self.get_input_value(var)
return res return res

View File

@@ -284,12 +284,18 @@ class Message(ComponentBase):
return return
for n, v in kwargs.items(): for n, v in kwargs.items():
content = re.sub(n, v, content) if v is not None:
content = re.sub(n, str(v), content)
self.set_output("downloads", downloads) self.set_output("downloads", downloads)
self.set_output("content", content) self.set_output("content", content)
self._convert_content(content) self._convert_content(content)
self._save_to_memory(content) try:
loop = asyncio.get_running_loop()
except RuntimeError:
asyncio.run(self._save_to_memory(content))
else:
asyncio.run_coroutine_threadsafe(self._save_to_memory(content), loop)
def thoughts(self) -> str: def thoughts(self) -> str:
return "" return ""