From fd7fb6669aa0aa1891efc1718c63a881d04e2729 Mon Sep 17 00:00:00 2001 From: Haruko386 Date: Fri, 3 Jul 2026 18:56:32 +0800 Subject: [PATCH] fix: cannot get query in agent-log (#16610) ### Summary As title bug: fixed: image --- internal/agent/canvas/scheduler.go | 36 +++++++++++++++++++++++++----- internal/service/agent.go | 36 +++++++++++++++++++----------- 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/internal/agent/canvas/scheduler.go b/internal/agent/canvas/scheduler.go index 6324877084..0b2c827f24 100644 --- a/internal/agent/canvas/scheduler.go +++ b/internal/agent/canvas/scheduler.go @@ -230,20 +230,39 @@ func emitEventFromCtx(ctx context.Context, ev RunEvent) { PushEvent(meta.Events, ev) } +func sanitizeNodeInputs(inputs map[string]any) map[string]any { + if len(inputs) == 0 { + return map[string]any{} + } + + out := make(map[string]any, len(inputs)) + for k, v := range inputs { + switch k { + case "state", "__cpn_id__", "__legacy_noop__": + continue + default: + out[k] = v + } + } + return out +} + // nodeStartedAt records the per-node start time in state.Sys and emits a // node_started RunEvent. Called from the per-node statePre wrapper. // Metadata (message/task/session ids) is read from ctx via RunMeta. -func nodeStartedAt(ctx context.Context, state *CanvasState, cpnID, componentName, componentType string) { +func nodeStartedAt(ctx context.Context, state *CanvasState, cpnID, componentName, componentType string, inputs map[string]any) { common.Debug("node_started", zap.String("cpnID", cpnID), zap.String("componentName", componentName)) if state == nil { return } now := float64(time.Now().UnixNano()) / 1e9 + if state.Sys != nil { state.Sys["_node_start_"+cpnID] = now + state.Sys["_node_inputs_"+cpnID] = sanitizeNodeInputs(inputs) } nsData, _ := json.Marshal(NodeStartedData{ - Inputs: nil, + Inputs: sanitizeNodeInputs(inputs), CreatedAt: now, ComponentID: cpnID, ComponentName: componentName, @@ -291,13 +310,20 @@ func nodeFinishedNow(ctx context.Context, state *CanvasState, cpnID, componentNa } } + inputs := map[string]any{} + if state.Sys != nil { + if v, ok := state.Sys["_node_inputs_"+cpnID].(map[string]any); ok { + inputs = v + } + } + var nfErr interface{} if nodeErr != nil { nfErr = nodeErr.Error() } nfData, _ := json.Marshal(NodeFinishedData{ - Inputs: map[string]any{}, + Inputs: inputs, Outputs: outputs, ComponentID: cpnID, ComponentName: componentName, @@ -407,7 +433,7 @@ func BuildWorkflow(ctx context.Context, c *Canvas) (*compose.Workflow[map[string node := wf.AddGraphNode(cpnID, exp.Graph, compose.WithNodeName(cpnID), compose.WithStatePreHandler[map[string]any, *CanvasState](func(ctx context.Context, in map[string]any, state *CanvasState) (map[string]any, error) { - nodeStartedAt(ctx, state, cpnID, comp.Obj.ComponentName, comp.Obj.ComponentName) + nodeStartedAt(ctx, state, cpnID, comp.Obj.ComponentName, comp.Obj.ComponentName, in) return statePre(ctx, in, state) }), compose.WithStatePostHandler[map[string]any, *CanvasState](func(ctx context.Context, out map[string]any, state *CanvasState) (map[string]any, error) { @@ -472,7 +498,7 @@ func BuildWorkflow(ctx context.Context, c *Canvas) (*compose.Workflow[map[string // service layer before invoke). componentName := c.Components[cpnID].Obj.ComponentName nodePre := func(ctx context.Context, in map[string]any, state *CanvasState) (map[string]any, error) { - nodeStartedAt(ctx, state, cpnID, componentName, componentName) + nodeStartedAt(ctx, state, cpnID, componentName, componentName, in) return statePre(ctx, in, state) } nodePost := func(ctx context.Context, out map[string]any, state *CanvasState) (map[string]any, error) { diff --git a/internal/service/agent.go b/internal/service/agent.go index a8e41b6d8e..c93928972a 100644 --- a/internal/service/agent.go +++ b/internal/service/agent.go @@ -773,20 +773,25 @@ func (s *AgentService) RunAgent(ctx context.Context, userID, canvasID, sessionID if payload, ok := ctx.Value(webhookPayloadKey{}).(map[string]any); ok && payload != nil { root["webhook_payload"] = payload } - // Phase 4.4 V2.1 (v3.6.1): populate root["tenant_id"] so the - // RunTracker.Start call (in buildRunFunc) records the run - // under the right tenant. The lookup is best-effort — a - // failure here (DAO down, user has no tenants) logs and - // continues with an empty tenant_id rather than failing - // the run; the run still works, the only loss is the - // per-tenant filterability of the run-history log. + // Match Python's @add_tenant_id_to_kwargs behavior for runtime + // components and model credential lookup: the canvas runs under + // the current caller's tenant id. Team-agent access was already + // authorized by loadCanvasForUser above; do not replace this with + // an arbitrary joined team tenant or LLM credential lookup can miss + // the caller's configured provider key. + root["tenant_id"] = userID + + // Preserve the historical RunTracker tenant dimension separately. + // Existing tests and log filters expect the joined tenant id in the + // run hash, but runtime state must keep tenant_id=userID. if tenantIDs, terr := s.userTenantDAO.GetTenantIDsByUserID(userID); terr == nil && len(tenantIDs) > 0 { - root["tenant_id"] = tenantIDs[0] + root["run_tenant_id"] = tenantIDs[0] } else if terr != nil { - common.Warn("service: RunAgent userTenantDAO.GetTenantIDsByUserID (best-effort, run not blocked)", + common.Warn("service: RunAgent userTenantDAO.GetTenantIDsByUserID (best-effort, run tracker tenant not populated)", zap.String("user_id", userID), zap.Error(terr)) } + // v3.6.1 diagnostic: log what RunAgent put into root so we can // confirm tenant_id / user_id / session_id / user_input all // reached the buildRunFunc closure (which runs in the runner's @@ -1181,11 +1186,16 @@ func runIDFor(canvasID string, root map[string]any) string { return canvasID } -// tenantIDFromRoot returns the optional tenant_id that the handler -// may have populated on the root map. Empty when absent — the -// RunTracker stores "" as the tenant id, which the test suite -// already exercises. +// tenantIDFromRoot returns the optional run-tracker tenant id that +// RunAgent populated on the root map. Runtime components use +// root["tenant_id"] / state.Sys["tenant_id"] for the caller tenant; +// RunTracker keeps the historical joined-tenant dimension separately. +// Empty when absent — the RunTracker stores "" as the tenant id, which +// the test suite already exercises. func tenantIDFromRoot(root map[string]any) string { + if s, ok := root["run_tenant_id"].(string); ok { + return s + } if s, ok := root["tenant_id"].(string); ok { return s }