From 78db4e949b50314ba943bd718efc84df3739020a Mon Sep 17 00:00:00 2001 From: Willsgao <43256529+Willsgao@users.noreply.github.com> Date: Mon, 29 Jun 2026 09:41:16 +0800 Subject: [PATCH] feat(agent): add module-level debug logging for canvas execution flow (#16200) Summary Add module-level debug logging to track Agent canvas execution flow (Closes #9306), enabling developers to diagnose component invocation, input/output states, and variable resolution without modifying production code. Also fix related bugs in message.py: re.sub backreference issue and unawaited _save_to_memory coroutine causing silent memory save failures. Changes agent/canvas.py: log workflow start, component invocation, and component completion agent/component/agent_with_tools.py: log Agent parameter resolution and LLM invocation path; standardize json.dumps usage agent/component/base.py: log get_input() variable resolution branches agent/component/message.py: fix re.sub backreference issue; properly await _save_to_memory coroutine Design Uses module-level loggers (logging.getLogger(__name__)) to support selective debugging: LOG_LEVELS=agent=DEBUG Zero performance impact in production (INFO level by default) Works with existing PUT /system/config/log API for runtime level changes Closes #9306 Note: While adding debug logging, I discovered and fixed two related bugs in message.py: - re.sub replacement value was interpreted as regex backreference instead of literal string - _save_to_memory coroutine was not properly awaited, causing silent failures --------- Co-authored-by: wills --- agent/canvas.py | 24 +++++++++++++++++++++++- agent/component/agent_with_tools.py | 16 ++++++++++++++++ agent/component/base.py | 16 ++++++++++++++-- agent/component/message.py | 10 ++++++++-- 4 files changed, 61 insertions(+), 5 deletions(-) diff --git a/agent/canvas.py b/agent/canvas.py index 22b42596cb..86fd8689ba 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -40,6 +40,8 @@ from rag.prompts.generator import chunks_format from rag.utils.redis_conn import REDIS_CONN from rag.utils.tts_cache import synthesize_with_cache +_logger = logging.getLogger(__name__) + class Graph: """ dsl = { @@ -438,6 +440,11 @@ class Canvas(Graph): if not is_resume: yield decorate("workflow_started", {"inputs": kwargs.get("inputs")}) + _logger.debug( + "[Canvas] Workflow started. Path: %s, Inputs: %s", + [self.get_component_name(c) for c in self.path], + json.dumps(kwargs.get("inputs", {}), ensure_ascii=False, default=str)[:500], + ) self.retrieval.append({"chunks": {}, "doc_aggs": {}}) async def _run_batch(f, t): @@ -482,6 +489,13 @@ class Canvas(Graph): if task_fn is None: continue + _logger.debug( + "[Canvas] Invoking component '%s' (%s) with inputs: %s", + self.get_component_name(self.path[i - 1]), + cpn.component_name, + json.dumps(call_kwargs, ensure_ascii=False, default=str)[:500], + ) + fn_invoke_async = getattr(cpn, "_invoke_async", None) use_async = (fn_invoke_async and asyncio.iscoroutinefunction(fn_invoke_async)) or asyncio.iscoroutinefunction(getattr(cpn, "_invoke", None)) tasks.append(asyncio.create_task(_invoke_one(cpn, task_fn, call_kwargs, use_async))) @@ -490,9 +504,17 @@ class Canvas(Graph): await asyncio.gather(*tasks) def _node_finished(cpn_obj): + outputs = cpn_obj.output() + _logger.debug( + "[Canvas] Component '%s' (%s) finished. Outputs: %s, Error: %s", + self.get_component_name(cpn_obj._id), + self.get_component_type(cpn_obj._id), + json.dumps(outputs, ensure_ascii=False, default=str)[:500], + cpn_obj.error(), + ) return decorate("node_finished",{ "inputs": cpn_obj.get_input_values(), - "outputs": cpn_obj.output(), + "outputs": outputs, "component_id": cpn_obj._id, "component_name": self.get_component_name(cpn_obj._id), "component_type": self.get_component_type(cpn_obj._id), diff --git a/agent/component/agent_with_tools.py b/agent/component/agent_with_tools.py index 57dbaeaa65..6ac4f220db 100644 --- a/agent/component/agent_with_tools.py +++ b/agent/component/agent_with_tools.py @@ -34,6 +34,8 @@ from common.connection_utils import timeout from common.mcp_tool_call_conn import MCPToolBinding, MCPToolCallSession, mcp_tool_metadata_to_openai_tool from rag.prompts.generator import citation_plus, citation_prompt, full_question, kb_prompt, message_fit_in, structured_output_prompt +_logger = logging.getLogger(__name__) + class AgentParam(LLMParam, ToolParamBase): """ @@ -194,6 +196,14 @@ class Agent(LLM, ToolBase): if self.check_if_canceled("Agent processing"): return + _logger.debug( + "[Agent] _invoke_async called. Component: %s, Keys in kwargs: %s, user_prompt: %s, tools count: %d", + self._id, + list(kwargs.keys()), + json.dumps(kwargs.get("user_prompt", ""), ensure_ascii=False, default=str)[:300], + len(self.tools) if self.tools else 0, + ) + if kwargs.get("user_prompt"): usr_pmt = "" if kwargs.get("reasoning"): @@ -205,10 +215,13 @@ class Agent(LLM, ToolBase): else: usr_pmt = str(kwargs["user_prompt"]) self._param.prompts = [{"role": "user", "content": usr_pmt}] + _logger.debug("[Agent] Built user prompt with length=%d, reasoning=%s, context=%s", + len(usr_pmt), bool(kwargs.get("reasoning")), bool(kwargs.get("context"))) if not self.tools: if self.check_if_canceled("Agent processing"): return + _logger.debug("[Agent] No tools configured. Delegating to LLM._invoke_async. prompt_count=%d", len(self._param.prompts) if self._param.prompts else 0) return await LLM._invoke_async(self, **kwargs) prompt, msg, user_defined_prompt = self._prepare_prompt_variables() @@ -223,11 +236,13 @@ class Agent(LLM, ToolBase): ex = self.exception_handler() has_message_downstream = any(self._canvas.get_component_obj(cid).component_name.lower() == "message" for cid in downstreams) if has_message_downstream and not (ex and ex["goto"]) and not output_schema: + _logger.debug("[Agent] Entering streaming mode (has message downstream)") self.set_output("content", partial(self.stream_output_with_tools_async, prompt, deepcopy(msg), user_defined_prompt)) return msg = self._fit_messages(prompt, msg) self._append_system_prompt(msg, schema_prompt) + _logger.debug("[Agent] Calling LLM with %d messages, has_schema=%s", len(msg), bool(schema_prompt)) ans = await self._generate_async(msg) if ans.find("**ERROR**") >= 0: @@ -257,6 +272,7 @@ class Agent(LLM, ToolBase): artifact_md = self._collect_tool_artifact_markdown(existing_text=ans) if artifact_md: ans += "\n\n" + artifact_md + _logger.debug("[Agent] Final output. content_length=%d, has_artifact=%s", len(ans), bool(artifact_md)) self.set_output("content", ans) return ans diff --git a/agent/component/base.py b/agent/component/base.py index af1d2306de..c0ddea168a 100644 --- a/agent/component/base.py +++ b/agent/component/base.py @@ -31,6 +31,8 @@ from common.connection_utils import timeout from common.misc_utils import thread_pool_exec +_logger = logging.getLogger(__name__) + _FEEDED_DEPRECATED_PARAMS = "_feeded_deprecated_params" _DEPRECATED_PARAMS = "_deprecated_params" _USER_FEEDED_PARAMS = "_user_feeded_params" @@ -481,18 +483,28 @@ class ComponentBase(ABC): return self._param.inputs.get(key, {}).get("value") res = {} - for var, o in self.get_input_elements().items(): + input_elements = self.get_input_elements() + _logger.debug( + "[Base] Component '%s' (%s) resolving inputs. Input element keys: %s", + self._id, self.component_name, list(input_elements.keys()), + ) + for var, o in input_elements.items(): v = self.get_param(var) if v is None: + _logger.debug("[Base] var '%s': param is None, skipping", var) continue if isinstance(v, str) and self._canvas.is_reff(v): - self.set_input_value(var, self._canvas.get_variable_value(v)) + resolved = self._canvas.get_variable_value(v) + self.set_input_value(var, resolved) + _logger.debug("[Base] var '%s': resolved ref '%s' -> %s", var, v, json.dumps(resolved, ensure_ascii=False, default=str)[:200]) elif isinstance(v, str) and re.search(self.variable_ref_patt, v): elements = self.get_input_elements_from_text(v) kv = {k: e.get('value', '') for k, e in elements.items()} self.set_input_value(var, self.string_format(v, kv)) + _logger.debug("[Base] var '%s': resolved text refs '%s' -> %s", var, v, json.dumps(kv, ensure_ascii=False, default=str)[:200]) else: self.set_input_value(var, v) + _logger.debug("[Base] var '%s': literal value -> %s", var, json.dumps(v, ensure_ascii=False, default=str)[:200]) res[var] = self.get_input_value(var) return res diff --git a/agent/component/message.py b/agent/component/message.py index 5ab7c6ef52..359bedbe58 100644 --- a/agent/component/message.py +++ b/agent/component/message.py @@ -284,12 +284,18 @@ class Message(ComponentBase): return for n, v in kwargs.items(): - content = re.sub(n, v, content) + if v is not None: + content = re.sub(n, str(v), content) self.set_output("downloads", downloads) self.set_output("content", content) self._convert_content(content) - self._save_to_memory(content) + try: + loop = asyncio.get_running_loop() + except RuntimeError: + asyncio.run(self._save_to_memory(content)) + else: + asyncio.run_coroutine_threadsafe(self._save_to_memory(content), loop) def thoughts(self) -> str: return ""