mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
## Summary
Aligns the **Go agent runtime/canvas/components/tools** behavior with
the **Python `agent/` implementation** so the same stored canvas DSL
produces the same execution result on either side. Every component,
tool, and runtime primitive in `internal/agent/` is now driven by the
same semantics as its Python counterpart — variable resolution, template
substitution, control flow, error reporting, retry/cancel, and stream
event shapes.
The **retrieval component is the one explicit exception** in this PR. It
is being reworked in a separate change and is excluded from this
alignment pass; the wrapper slot (`universe_a_wrappers.go →
newRetrievalComponent`) is preserved.
## Scope of alignment
### Components (all aligned with `agent/component/`)
`Begin` · `Message` · `LLM` (incl. ChatTemplateKwargs,
MessageHistoryWindowSize, VisualFiles, Cite, OutputStructure,
JSONOutput, TopP, MaxRetries, DelayAfterError, credentials) · `Agent`
(react + tool artifact capture + `Reset()` interface-assert) · `Switch`
(12/12 operators, Python-equivalent semantics) · `Categorize` · `Invoke`
· `Iteration` · `Loop` (macro-expansion through `workflowx.AddLoopNode`)
· `UserFillUp` (Python-equivalent interrupt/resume via eino
`compose.Interrupt`/`ResumeWithData`) · `FillUp` · `DataOperations` ·
`ListOperations` · `StringTransform` · `VariableAggregator` ·
`VariableAssigner` · `Browser` (full stagehand runtime parity) ·
`DocsGenerator` · `ExcelProcessor`.
### Tools (all aligned with `agent/tools/`)
`Retrieval` (wrapper slot only — logic out of scope) · `MCPToolAdapter`
(streamable-HTTP) · `CodeExec` (sandbox bridge with
`code_exec_contract.go` matching Python contract) · `AkShare` · `ArXiv`
· `Crawler` · `DeepL` · `DuckDuckGo` · `Email` · `ExeSQL` · `GitHub` ·
`Google` · `GoogleScholar` · `Jin10` · `PubMed` · `QWeather` · `SearXNG`
· `Tavily` · `Tushare` · `Wencai` · `Wikipedia` · `YahooFinance` —
uniform `eino tool.InvokableTool` interface, SSRF protection, shared
HTTP client.
### Canvas execution engine (`internal/agent/canvas/`)
Aligned with Python's `agent/canvas.py`:
- **Scheduler** (`scheduler.go`): state pre/post handlers, node lambdas,
per-component timeout resolver (4-level: per-class env → per-class table
→ uniform env → 600s fallback), `legacyNoOpNames`.
- **Loop subgraph** (`loop_subgraph.go`): Python-equivalent
`AddLoopNode` macro expansion + condition translation.
- **Multibranch** (`multibranch.go`): `Switch` / `Categorize` routing
via `compose.NewGraphMultiBranch` — same branch selection semantics as
Python.
- **Parallel subgraph** (`parallel_subgraph.go`): matches Python's
parallel fan-out contract.
- **Interrupt/Resume** (`interrupt_resume.go`): `UserFillUpNodeBody` /
`IsInterruptError` / `ExtractInterruptContexts` — replaces the
deprecated Python sentinel chain with eino's native interrupt API,
preserving the same external behavior.
- **Checkpoint** (`checkpoint_store.go`): `RedisCheckPointStore`
Get/Set/Delete, with business metadata (status / canvas_id /
parent_run_id) on a parallel Redis Hash.
- **RunTracker** (`run_tracker.go`): Start / MarkSucceeded / MarkFailed
/ MarkCancelled / AttachCheckpoint — same lifecycle as the Python run
record.
- **Cancel** (`cancel.go`): Redis pub/sub watch.
- **Stream** (`stream.go`): SSE channel with `messages` / `waiting` /
`errors` / `done` events, same shape as Python's `agent.canvas.RunEvent`
payload.
### DSL bridge (`internal/agent/dsl/`)
- `normalize.go`: v1↔v2 collapsed into a single wire format — Python and
Go consume the same stored JSON.
- `reset.go`: per-run state reset matches Python's `Canvas.reset()`
semantics.
- Testdata mirrors Python's `agent_msg.json` / `all.json` / etc.
### Runtime (`internal/agent/runtime/`)
- `CanvasState` / `NewCanvasState` / `GetVar` / `SetVar` / `ReadVars`:
same `{{cpn_id@param}}` resolution model.
- `ResolveTemplate` (regex fast path + gonja fallback) — Python
Jinja-style semantics.
- `selector.go`, `metrics.go`, `component.go`: shared runtime contracts.
## Out of scope (intentionally)
- **`Retrieval` component logic** — wrapped only; full parity lands in a
follow-up PR.
- **Frontend** — only minor dsl-bridge / canvas UX fixes ride along.
- **CLI / admin / model registry** — orthogonal to agent behavior.
## How alignment is verified
`internal/service/agent_run_e2e_test.go` exercises the **full production
chain** against real Python-shaped DSL fixtures:
```
loadCanvasForUser → versionDAO.GetLatest → decodeCanvasFromDSL →
canvas.Compile → cc.Workflow.Invoke → answer extraction
```
using in-memory SQLite + miniredis (no Docker). Covers:
- `TestRunAgent_RealCanvas_BeginMessage` — happy path, `{{sys.query}}`
resolution
- `TestRunAgent_RealCanvas_WaitForUserResume` — two-run resume cycle
(Python-equivalent)
- `TestRunAgent_RealCanvas_CompileFails` — unknown component name →
sanitized error (Python-equivalent)
- `TestRunAgent_RealCanvas_InvokeFails` — unresolvable template ref
(Python-equivalent)
- `TestRunAgent_RunTracker_AttachCheckpoint_CallSequence` —
Start→AttachCheckpoint→MarkSucceeded lifecycle
`internal/handler/agent_test.go` — SSE streaming parity (`Content-Type:
text/event-stream`, `data: {…}\n\n`, trailing `data: [DONE]\n\n`,
OpenAI-compatible non-stream `choices`).
`internal/agent/canvas/fixture_compile_test.go` + per-component tests
pin the Python-equivalent outputs.
```
go test -count=1 -v -run 'TestRunAgent_RealCanvas|TestRunAgent_RunTracker' ./internal/service/
```
## Design reference
`docs/develop/agent-go-port-design.md` (1329 lines, last cross-checked
2026-06-17) — module layout, per-component / per-tool inventory,
corner-case catalogue, and the actionable backlog (Section 14, including
the retrieval alignment follow-up).
---------
Co-authored-by: Claude <noreply@anthropic.com>
543 lines
15 KiB
Go
543 lines
15 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
// Package component — DataOperations (T3, plan §2.11.3 row 16).
|
|
//
|
|
// DataOperations applies one of seven dict/list transforms to a list
|
|
// of dicts pulled from the canvas state. It is pure: no state writes;
|
|
// the transformed payload is returned at outputs["result"].
|
|
//
|
|
// Operations:
|
|
// - select_keys : keep only the listed keys per dict
|
|
// - literal_eval : walk input_objects; try to parse JSON-like
|
|
// string leaves (the Go port uses json.Unmarshal
|
|
// as a stand-in for Python's ast.literal_eval —
|
|
// tuples/sets are NOT supported, matching the
|
|
// JSON-shaped LLM output the canvas typically
|
|
// consumes).
|
|
// - combine : merge all input dicts into one
|
|
// - filter_values : keep dicts matching all rules
|
|
// - append_or_update: apply updates [{key, value}] per dict
|
|
// - remove_keys : drop the listed keys per dict
|
|
// - rename_keys : rename per [{old_key, new_key}] per dict
|
|
//
|
|
// Mirrors agent/component/data_operations.py.
|
|
package component
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"ragflow/internal/agent/runtime"
|
|
)
|
|
|
|
const componentNameDataOperations = "DataOperations"
|
|
|
|
// dataOperationsParam is the static configuration.
|
|
type dataOperationsParam struct {
|
|
Query []string `json:"query"`
|
|
Operations string `json:"operations"`
|
|
SelectKeys []string `json:"select_keys"`
|
|
FilterValues []map[string]any `json:"filter_values"`
|
|
Updates []map[string]any `json:"updates"`
|
|
RemoveKeys []string `json:"remove_keys"`
|
|
RenameKeys []map[string]any `json:"rename_keys"`
|
|
}
|
|
|
|
// Update copies a fresh param map into the receiver.
|
|
func (p *dataOperationsParam) Update(conf map[string]any) error {
|
|
if conf == nil {
|
|
conf = map[string]any{}
|
|
}
|
|
p.Query = toStringSlice(conf["query"])
|
|
p.Operations, _ = conf["operations"].(string)
|
|
if p.Operations == "" {
|
|
p.Operations = "literal_eval"
|
|
}
|
|
p.SelectKeys = toStringSlice(conf["select_keys"])
|
|
p.FilterValues = toMapSlice(conf["filter_values"])
|
|
p.Updates = toMapSlice(conf["updates"])
|
|
p.RemoveKeys = toStringSlice(conf["remove_keys"])
|
|
p.RenameKeys = toMapSlice(conf["rename_keys"])
|
|
return nil
|
|
}
|
|
|
|
// Check validates the param.
|
|
func (p *dataOperationsParam) Check() error {
|
|
switch p.Operations {
|
|
case "select_keys", "literal_eval", "combine", "filter_values",
|
|
"append_or_update", "remove_keys", "rename_keys":
|
|
// ok
|
|
default:
|
|
return &ParamError{
|
|
Field: "operations",
|
|
Reason: "must be one of: select_keys, literal_eval, combine, filter_values, append_or_update, remove_keys, rename_keys",
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AsDict returns the params as a plain map.
|
|
func (p *dataOperationsParam) AsDict() map[string]any {
|
|
return map[string]any{
|
|
"query": p.Query,
|
|
"operations": p.Operations,
|
|
"select_keys": p.SelectKeys,
|
|
"filter_values": p.FilterValues,
|
|
"updates": p.Updates,
|
|
"remove_keys": p.RemoveKeys,
|
|
"rename_keys": p.RenameKeys,
|
|
}
|
|
}
|
|
|
|
// toStringSlice normalizes a value to []string. Strings (CSV) and
|
|
// []any are accepted; nil returns nil.
|
|
func toStringSlice(v any) []string {
|
|
switch x := v.(type) {
|
|
case nil:
|
|
return nil
|
|
case string:
|
|
// CSV fallback: "a,b,c" → ["a","b","c"]
|
|
parts := strings.Split(x, ",")
|
|
out := make([]string, 0, len(parts))
|
|
for _, p := range parts {
|
|
s := strings.TrimSpace(p)
|
|
if s != "" {
|
|
out = append(out, s)
|
|
}
|
|
}
|
|
return out
|
|
case []any:
|
|
out := make([]string, 0, len(x))
|
|
for _, item := range x {
|
|
if s, ok := item.(string); ok {
|
|
out = append(out, s)
|
|
}
|
|
}
|
|
return out
|
|
case []string:
|
|
return append([]string{}, x...)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// toMapSlice normalizes a value to []map[string]any.
|
|
func toMapSlice(v any) []map[string]any {
|
|
switch x := v.(type) {
|
|
case nil:
|
|
return nil
|
|
case []any:
|
|
out := make([]map[string]any, 0, len(x))
|
|
for _, item := range x {
|
|
if m, ok := item.(map[string]any); ok {
|
|
out = append(out, m)
|
|
}
|
|
}
|
|
return out
|
|
case []map[string]any:
|
|
return append([]map[string]any{}, x...)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DataOperationsComponent implements the 7 dict transforms.
|
|
type DataOperationsComponent struct {
|
|
name string
|
|
param dataOperationsParam
|
|
}
|
|
|
|
// NewDataOperationsComponent constructs a DataOperations from the
|
|
// DSL param map.
|
|
func NewDataOperationsComponent(params map[string]any) (Component, error) {
|
|
p := &dataOperationsParam{}
|
|
if err := p.Update(params); err != nil {
|
|
return nil, fmt.Errorf("DataOperations: param update: %w", err)
|
|
}
|
|
if err := p.Check(); err != nil {
|
|
return nil, fmt.Errorf("DataOperations: param check: %w", err)
|
|
}
|
|
return &DataOperationsComponent{
|
|
name: componentNameDataOperations,
|
|
param: *p,
|
|
}, nil
|
|
}
|
|
|
|
// Name returns the registered component name.
|
|
func (d *DataOperationsComponent) Name() string { return d.name }
|
|
|
|
// Invoke loads input_objects from the configured query refs, then
|
|
// dispatches to the operation-specific helper.
|
|
func (d *DataOperationsComponent) Invoke(ctx context.Context, _ map[string]any) (map[string]any, error) {
|
|
state, _, err := runtime.GetStateFromContext[*runtime.CanvasState](ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("DataOperations: %w", err)
|
|
}
|
|
if state == nil {
|
|
return nil, fmt.Errorf("DataOperations: nil canvas state")
|
|
}
|
|
|
|
// Coerce query to a list: param.query may arrive as a single
|
|
// string in the JSON DSL, which the Python code wraps in [x].
|
|
queries := d.param.Query
|
|
if len(queries) == 0 {
|
|
// fall back to single ref parsed from a string param — when
|
|
// the engine loads the DSL it may pass a single ref; tolerate.
|
|
queries = []string{}
|
|
}
|
|
|
|
var inputObjects []map[string]any
|
|
for _, ref := range queries {
|
|
if ref == "" {
|
|
continue
|
|
}
|
|
v, err := state.GetVar(ref)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("DataOperations: query %q: %w", ref, err)
|
|
}
|
|
if v == nil {
|
|
continue
|
|
}
|
|
switch x := v.(type) {
|
|
case map[string]any:
|
|
inputObjects = append(inputObjects, x)
|
|
case []any:
|
|
for _, item := range x {
|
|
if m, ok := item.(map[string]any); ok {
|
|
inputObjects = append(inputObjects, m)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var result any
|
|
switch d.param.Operations {
|
|
case "select_keys":
|
|
result = d.opSelectKeys(inputObjects)
|
|
case "literal_eval":
|
|
result = d.opLiteralEval(inputObjects)
|
|
case "combine":
|
|
result = d.opCombine(inputObjects)
|
|
case "filter_values":
|
|
result = d.opFilterValues(state, inputObjects)
|
|
case "append_or_update":
|
|
result = d.opAppendOrUpdate(state, inputObjects)
|
|
case "remove_keys":
|
|
result = d.opRemoveKeys(inputObjects)
|
|
case "rename_keys":
|
|
result = d.opRenameKeys(inputObjects)
|
|
}
|
|
return map[string]any{"result": result}, nil
|
|
}
|
|
|
|
// Stream mirrors Invoke; DataOperations is a single-shot transform.
|
|
func (d *DataOperationsComponent) Stream(ctx context.Context, inputs map[string]any) (<-chan map[string]any, error) {
|
|
out, err := d.Invoke(ctx, inputs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ch := make(chan map[string]any, 1)
|
|
ch <- out
|
|
close(ch)
|
|
return ch, nil
|
|
}
|
|
|
|
// Inputs returns an empty surface — all config is in the param.
|
|
func (d *DataOperationsComponent) Inputs() map[string]string {
|
|
return map[string]string{
|
|
"query": "List of data items to operate on. Can override the static DSL param.",
|
|
"operations": "Operation name: literal_eval, select_keys, filter, update, remove, rename. Overrides DSL param.",
|
|
"select_keys": "List of keys to keep (for select_keys operation). Overrides DSL param.",
|
|
"filter_values": "List of {key, op, value} filters (for filter operation). Overrides DSL param.",
|
|
"updates": "List of {key, value} updates (for update operation). Overrides DSL param.",
|
|
"remove_keys": "List of keys to remove (for remove operation). Overrides DSL param.",
|
|
"rename_keys": "List of {old_key, new_key} mappings (for rename operation). Overrides DSL param.",
|
|
}
|
|
}
|
|
|
|
// Outputs returns the transformed payload.
|
|
func (d *DataOperationsComponent) Outputs() map[string]string {
|
|
return map[string]string{
|
|
"result": "Transformed payload: a list of dicts for most ops, or a single dict for combine.",
|
|
}
|
|
}
|
|
|
|
// opSelectKeys keeps only the listed keys per dict. Result is []any
|
|
// of dicts.
|
|
func (d *DataOperationsComponent) opSelectKeys(items []map[string]any) []any {
|
|
keep := make(map[string]struct{}, len(d.param.SelectKeys))
|
|
for _, k := range d.param.SelectKeys {
|
|
keep[k] = struct{}{}
|
|
}
|
|
out := make([]any, 0, len(items))
|
|
for _, item := range items {
|
|
cp := make(map[string]any, len(keep))
|
|
for k := range item {
|
|
if _, ok := keep[k]; ok {
|
|
cp[k] = item[k]
|
|
}
|
|
}
|
|
out = append(out, cp)
|
|
}
|
|
return out
|
|
}
|
|
|
|
// opLiteralEval walks the input list and tries to JSON-decode any
|
|
// string leaf that looks like a JSON literal. Returns a list of
|
|
// (possibly-mutated) dicts.
|
|
func (d *DataOperationsComponent) opLiteralEval(items []map[string]any) []any {
|
|
out := make([]any, 0, len(items))
|
|
for _, item := range items {
|
|
out = append(out, recursiveEval(item))
|
|
}
|
|
return out
|
|
}
|
|
|
|
// recursiveEval mirrors the Python _recursive_eval helper: any string
|
|
// that starts with a JSON delimiter or known literal is unmarshaled.
|
|
// On failure, the original string is returned.
|
|
func recursiveEval(v any) any {
|
|
switch x := v.(type) {
|
|
case map[string]any:
|
|
out := make(map[string]any, len(x))
|
|
for k, val := range x {
|
|
out[k] = recursiveEval(val)
|
|
}
|
|
return out
|
|
case []any:
|
|
out := make([]any, 0, len(x))
|
|
for _, item := range x {
|
|
out = append(out, recursiveEval(item))
|
|
}
|
|
return out
|
|
case string:
|
|
s := strings.TrimSpace(x)
|
|
if s == "" {
|
|
return x
|
|
}
|
|
// Detect likely JSON literal: starts with one of { [ ( " '
|
|
// digit, or is a known scalar literal (true/false/null).
|
|
first := s[0]
|
|
lower := strings.ToLower(s)
|
|
isLiteral := false
|
|
switch first {
|
|
case '{', '[', '(', '"', '\'':
|
|
isLiteral = true
|
|
}
|
|
if !isLiteral {
|
|
// digit
|
|
if first >= '0' && first <= '9' {
|
|
isLiteral = true
|
|
}
|
|
}
|
|
if !isLiteral && (lower == "true" || lower == "false" || lower == "null" || lower == "none") {
|
|
isLiteral = true
|
|
}
|
|
if !isLiteral {
|
|
return x
|
|
}
|
|
var parsed any
|
|
// Try JSON. If it fails, return the original string.
|
|
if err := json.Unmarshal([]byte(s), &parsed); err == nil {
|
|
return parsed
|
|
}
|
|
return x
|
|
}
|
|
return v
|
|
}
|
|
|
|
// opCombine merges all input dicts into one. Key conflicts:
|
|
// - existing is a list → extend (or append if new is scalar)
|
|
// - existing is scalar, new is list → wrap as [old, *new]
|
|
// - existing is scalar, new is scalar → wrap as [old, new]
|
|
func (d *DataOperationsComponent) opCombine(items []map[string]any) map[string]any {
|
|
out := map[string]any{}
|
|
for _, obj := range items {
|
|
for k, v := range obj {
|
|
existing, ok := out[k]
|
|
if !ok {
|
|
out[k] = v
|
|
continue
|
|
}
|
|
switch ex := existing.(type) {
|
|
case []any:
|
|
if vl, ok := v.([]any); ok {
|
|
out[k] = append(ex, vl...)
|
|
} else {
|
|
out[k] = append(ex, v)
|
|
}
|
|
default:
|
|
if vl, ok := v.([]any); ok {
|
|
out[k] = []any{ex, vl}
|
|
} else {
|
|
out[k] = []any{ex, v}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// opFilterValues keeps dicts where every rule matches.
|
|
func (d *DataOperationsComponent) opFilterValues(state *runtime.CanvasState, items []map[string]any) []any {
|
|
rules := d.param.FilterValues
|
|
out := make([]any, 0, len(items))
|
|
for _, obj := range items {
|
|
if len(rules) == 0 {
|
|
out = append(out, obj)
|
|
continue
|
|
}
|
|
all := true
|
|
for _, rule := range rules {
|
|
if !matchRule(state, obj, rule) {
|
|
all = false
|
|
break
|
|
}
|
|
}
|
|
if all {
|
|
out = append(out, obj)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// matchRule evaluates one filter rule against obj. Mirrors the
|
|
// Python match_rule helper.
|
|
func matchRule(state *runtime.CanvasState, obj map[string]any, rule map[string]any) bool {
|
|
key, _ := rule["key"].(string)
|
|
if _, ok := obj[key]; !ok {
|
|
return false
|
|
}
|
|
op := strings.ToLower(asString(rule["operator"]))
|
|
if op == "" {
|
|
op = "equals"
|
|
}
|
|
target := normValue(rule["value"])
|
|
// Try to resolve {{...}} in target via state.
|
|
if s, ok := rule["value"].(string); ok && strings.Contains(s, "{{") {
|
|
if resolved, err := runtime.ResolveTemplate(s, state); err == nil {
|
|
target = resolved
|
|
}
|
|
}
|
|
v := normValue(obj[key])
|
|
switch op {
|
|
case "=", "equals":
|
|
return v == target
|
|
case "≠", "!=":
|
|
return v != target
|
|
case "contains":
|
|
return strings.Contains(v, target)
|
|
case "start with":
|
|
return strings.HasPrefix(v, target)
|
|
case "end with":
|
|
return strings.HasSuffix(v, target)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// asString is a forgiving cast for params that may arrive as int/str.
|
|
func asString(v any) string {
|
|
if s, ok := v.(string); ok {
|
|
return s
|
|
}
|
|
return fmt.Sprintf("%v", v)
|
|
}
|
|
|
|
// opAppendOrUpdate copies each dict and applies updates. Values that
|
|
// look like {{ref}} are resolved via state; otherwise used as-is.
|
|
func (d *DataOperationsComponent) opAppendOrUpdate(state *runtime.CanvasState, items []map[string]any) []any {
|
|
out := make([]any, 0, len(items))
|
|
for _, obj := range items {
|
|
cp := make(map[string]any, len(obj))
|
|
for k, v := range obj {
|
|
cp[k] = v
|
|
}
|
|
for _, upd := range d.param.Updates {
|
|
k := strings.TrimSpace(asString(upd["key"]))
|
|
if k == "" {
|
|
continue
|
|
}
|
|
raw := upd["value"]
|
|
// Resolve {{...}} templates first; fall back to plain
|
|
// state-ref resolution (matches the Python
|
|
// get_value_with_variable behavior — strings are looked
|
|
// up in state when they look like refs).
|
|
if s, ok := raw.(string); ok {
|
|
if strings.Contains(s, "{{") {
|
|
if resolved, err := runtime.ResolveTemplate(s, state); err == nil && resolved != "" {
|
|
cp[k] = resolved
|
|
continue
|
|
}
|
|
}
|
|
if v, err := state.GetVar(s); err == nil && v != nil {
|
|
cp[k] = v
|
|
continue
|
|
}
|
|
}
|
|
cp[k] = raw
|
|
}
|
|
out = append(out, cp)
|
|
}
|
|
return out
|
|
}
|
|
|
|
// opRemoveKeys copies each dict and drops the listed keys.
|
|
func (d *DataOperationsComponent) opRemoveKeys(items []map[string]any) []any {
|
|
out := make([]any, 0, len(items))
|
|
for _, obj := range items {
|
|
cp := make(map[string]any, len(obj))
|
|
for k, v := range obj {
|
|
cp[k] = v
|
|
}
|
|
for _, k := range d.param.RemoveKeys {
|
|
if _, ok := cp[k]; ok {
|
|
delete(cp, k)
|
|
}
|
|
}
|
|
out = append(out, cp)
|
|
}
|
|
return out
|
|
}
|
|
|
|
// opRenameKeys copies each dict and renames per the configured pairs.
|
|
func (d *DataOperationsComponent) opRenameKeys(items []map[string]any) []any {
|
|
out := make([]any, 0, len(items))
|
|
for _, obj := range items {
|
|
cp := make(map[string]any, len(obj))
|
|
for k, v := range obj {
|
|
cp[k] = v
|
|
}
|
|
for _, pair := range d.param.RenameKeys {
|
|
old := strings.TrimSpace(asString(pair["old_key"]))
|
|
new := strings.TrimSpace(asString(pair["new_key"]))
|
|
if old == "" || new == "" || old == new {
|
|
continue
|
|
}
|
|
if v, ok := cp[old]; ok {
|
|
cp[new] = v
|
|
delete(cp, old)
|
|
}
|
|
}
|
|
out = append(out, cp)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func init() {
|
|
Register(componentNameDataOperations, NewDataOperationsComponent)
|
|
}
|