mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-04 09:39:32 +08:00
fix: cannot get query in agent-log (#16610)
### Summary As title bug: fixed: <img width="1827" height="1286" alt="image" src="https://github.com/user-attachments/assets/0cdc391c-43d7-4330-bc34-3aefe5d4f4ee" />
This commit is contained in:
@@ -230,20 +230,39 @@ func emitEventFromCtx(ctx context.Context, ev RunEvent) {
|
||||
PushEvent(meta.Events, ev)
|
||||
}
|
||||
|
||||
func sanitizeNodeInputs(inputs map[string]any) map[string]any {
|
||||
if len(inputs) == 0 {
|
||||
return map[string]any{}
|
||||
}
|
||||
|
||||
out := make(map[string]any, len(inputs))
|
||||
for k, v := range inputs {
|
||||
switch k {
|
||||
case "state", "__cpn_id__", "__legacy_noop__":
|
||||
continue
|
||||
default:
|
||||
out[k] = v
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// nodeStartedAt records the per-node start time in state.Sys and emits a
|
||||
// node_started RunEvent. Called from the per-node statePre wrapper.
|
||||
// Metadata (message/task/session ids) is read from ctx via RunMeta.
|
||||
func nodeStartedAt(ctx context.Context, state *CanvasState, cpnID, componentName, componentType string) {
|
||||
func nodeStartedAt(ctx context.Context, state *CanvasState, cpnID, componentName, componentType string, inputs map[string]any) {
|
||||
common.Debug("node_started", zap.String("cpnID", cpnID), zap.String("componentName", componentName))
|
||||
if state == nil {
|
||||
return
|
||||
}
|
||||
now := float64(time.Now().UnixNano()) / 1e9
|
||||
|
||||
if state.Sys != nil {
|
||||
state.Sys["_node_start_"+cpnID] = now
|
||||
state.Sys["_node_inputs_"+cpnID] = sanitizeNodeInputs(inputs)
|
||||
}
|
||||
nsData, _ := json.Marshal(NodeStartedData{
|
||||
Inputs: nil,
|
||||
Inputs: sanitizeNodeInputs(inputs),
|
||||
CreatedAt: now,
|
||||
ComponentID: cpnID,
|
||||
ComponentName: componentName,
|
||||
@@ -291,13 +310,20 @@ func nodeFinishedNow(ctx context.Context, state *CanvasState, cpnID, componentNa
|
||||
}
|
||||
}
|
||||
|
||||
inputs := map[string]any{}
|
||||
if state.Sys != nil {
|
||||
if v, ok := state.Sys["_node_inputs_"+cpnID].(map[string]any); ok {
|
||||
inputs = v
|
||||
}
|
||||
}
|
||||
|
||||
var nfErr interface{}
|
||||
if nodeErr != nil {
|
||||
nfErr = nodeErr.Error()
|
||||
}
|
||||
|
||||
nfData, _ := json.Marshal(NodeFinishedData{
|
||||
Inputs: map[string]any{},
|
||||
Inputs: inputs,
|
||||
Outputs: outputs,
|
||||
ComponentID: cpnID,
|
||||
ComponentName: componentName,
|
||||
@@ -407,7 +433,7 @@ func BuildWorkflow(ctx context.Context, c *Canvas) (*compose.Workflow[map[string
|
||||
node := wf.AddGraphNode(cpnID, exp.Graph,
|
||||
compose.WithNodeName(cpnID),
|
||||
compose.WithStatePreHandler[map[string]any, *CanvasState](func(ctx context.Context, in map[string]any, state *CanvasState) (map[string]any, error) {
|
||||
nodeStartedAt(ctx, state, cpnID, comp.Obj.ComponentName, comp.Obj.ComponentName)
|
||||
nodeStartedAt(ctx, state, cpnID, comp.Obj.ComponentName, comp.Obj.ComponentName, in)
|
||||
return statePre(ctx, in, state)
|
||||
}),
|
||||
compose.WithStatePostHandler[map[string]any, *CanvasState](func(ctx context.Context, out map[string]any, state *CanvasState) (map[string]any, error) {
|
||||
@@ -472,7 +498,7 @@ func BuildWorkflow(ctx context.Context, c *Canvas) (*compose.Workflow[map[string
|
||||
// service layer before invoke).
|
||||
componentName := c.Components[cpnID].Obj.ComponentName
|
||||
nodePre := func(ctx context.Context, in map[string]any, state *CanvasState) (map[string]any, error) {
|
||||
nodeStartedAt(ctx, state, cpnID, componentName, componentName)
|
||||
nodeStartedAt(ctx, state, cpnID, componentName, componentName, in)
|
||||
return statePre(ctx, in, state)
|
||||
}
|
||||
nodePost := func(ctx context.Context, out map[string]any, state *CanvasState) (map[string]any, error) {
|
||||
|
||||
@@ -773,20 +773,25 @@ func (s *AgentService) RunAgent(ctx context.Context, userID, canvasID, sessionID
|
||||
if payload, ok := ctx.Value(webhookPayloadKey{}).(map[string]any); ok && payload != nil {
|
||||
root["webhook_payload"] = payload
|
||||
}
|
||||
// Phase 4.4 V2.1 (v3.6.1): populate root["tenant_id"] so the
|
||||
// RunTracker.Start call (in buildRunFunc) records the run
|
||||
// under the right tenant. The lookup is best-effort — a
|
||||
// failure here (DAO down, user has no tenants) logs and
|
||||
// continues with an empty tenant_id rather than failing
|
||||
// the run; the run still works, the only loss is the
|
||||
// per-tenant filterability of the run-history log.
|
||||
// Match Python's @add_tenant_id_to_kwargs behavior for runtime
|
||||
// components and model credential lookup: the canvas runs under
|
||||
// the current caller's tenant id. Team-agent access was already
|
||||
// authorized by loadCanvasForUser above; do not replace this with
|
||||
// an arbitrary joined team tenant or LLM credential lookup can miss
|
||||
// the caller's configured provider key.
|
||||
root["tenant_id"] = userID
|
||||
|
||||
// Preserve the historical RunTracker tenant dimension separately.
|
||||
// Existing tests and log filters expect the joined tenant id in the
|
||||
// run hash, but runtime state must keep tenant_id=userID.
|
||||
if tenantIDs, terr := s.userTenantDAO.GetTenantIDsByUserID(userID); terr == nil && len(tenantIDs) > 0 {
|
||||
root["tenant_id"] = tenantIDs[0]
|
||||
root["run_tenant_id"] = tenantIDs[0]
|
||||
} else if terr != nil {
|
||||
common.Warn("service: RunAgent userTenantDAO.GetTenantIDsByUserID (best-effort, run not blocked)",
|
||||
common.Warn("service: RunAgent userTenantDAO.GetTenantIDsByUserID (best-effort, run tracker tenant not populated)",
|
||||
zap.String("user_id", userID),
|
||||
zap.Error(terr))
|
||||
}
|
||||
|
||||
// v3.6.1 diagnostic: log what RunAgent put into root so we can
|
||||
// confirm tenant_id / user_id / session_id / user_input all
|
||||
// reached the buildRunFunc closure (which runs in the runner's
|
||||
@@ -1181,11 +1186,16 @@ func runIDFor(canvasID string, root map[string]any) string {
|
||||
return canvasID
|
||||
}
|
||||
|
||||
// tenantIDFromRoot returns the optional tenant_id that the handler
|
||||
// may have populated on the root map. Empty when absent — the
|
||||
// RunTracker stores "" as the tenant id, which the test suite
|
||||
// already exercises.
|
||||
// tenantIDFromRoot returns the optional run-tracker tenant id that
|
||||
// RunAgent populated on the root map. Runtime components use
|
||||
// root["tenant_id"] / state.Sys["tenant_id"] for the caller tenant;
|
||||
// RunTracker keeps the historical joined-tenant dimension separately.
|
||||
// Empty when absent — the RunTracker stores "" as the tenant id, which
|
||||
// the test suite already exercises.
|
||||
func tenantIDFromRoot(root map[string]any) string {
|
||||
if s, ok := root["run_tenant_id"].(string); ok {
|
||||
return s
|
||||
}
|
||||
if s, ok := root["tenant_id"].(string); ok {
|
||||
return s
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user