mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-05 10:58:34 +08:00
464 lines
15 KiB
Go
464 lines
15 KiB
Go
// Package harness is the main package for Agent Harness Go.
|
|
//
|
|
// Agent Harness is a framework for building stateful, multi-agent
|
|
// applications with LLMs. It provides a graph-based execution model
|
|
// that supports:
|
|
//
|
|
// - Stateful computation with channels and reducers
|
|
// - Multi-agent workflows with subgraphs
|
|
// - Human-in-the-loop with interrupts
|
|
// - Persistence with checkpoints
|
|
// - Streaming and debugging
|
|
//
|
|
// Basic Usage:
|
|
//
|
|
// import (
|
|
// "context"
|
|
// "ragflow/internal/harness"
|
|
// "ragflow/internal/harness/graph/channels"
|
|
// )
|
|
//
|
|
// // Define state schema
|
|
// type State struct {
|
|
// Messages []string
|
|
// Counter int
|
|
// }
|
|
//
|
|
// // Create graph
|
|
// builder := harness.NewStateGraph(State{})
|
|
//
|
|
// // Add nodes
|
|
// builder.AddNode("agent", func(ctx context.Context, state interface{}) (interface{}, error) {
|
|
// s := state.(State)
|
|
// s.Messages = append(s.Messages, "Hello from agent")
|
|
// s.Counter++
|
|
// return s, nil
|
|
// })
|
|
//
|
|
// // Add edges
|
|
// builder.AddEdge("__start__", "agent")
|
|
// builder.AddEdge("agent", "__end__")
|
|
//
|
|
// // Compile and run
|
|
// graph, err := builder.Compile()
|
|
// if err != nil {
|
|
// log.Fatal(err)
|
|
// }
|
|
//
|
|
// result, err := graph.Invoke(context.Background(), State{
|
|
// Messages: []string{"Hello"},
|
|
// Counter: 0,
|
|
// })
|
|
//
|
|
// For more examples and documentation, visit:
|
|
// https://ragflow/internal/harness
|
|
package harness
|
|
|
|
import (
|
|
"ragflow/internal/harness/core"
|
|
"ragflow/internal/harness/graph/channels"
|
|
"ragflow/internal/harness/graph/checkpoint"
|
|
"ragflow/internal/harness/graph/constants"
|
|
"ragflow/internal/harness/graph/errors"
|
|
"ragflow/internal/harness/graph/graph"
|
|
"ragflow/internal/harness/graph/interrupt"
|
|
_ "ragflow/internal/harness/graph/pregel" // triggers pregel.init() → sets PregelRunFunc
|
|
"ragflow/internal/harness/graph/types"
|
|
"ragflow/internal/harness/prebuilt"
|
|
)
|
|
|
|
// Re-export main types for convenience.
|
|
type (
|
|
// StateGraph is a graph whose nodes communicate by reading and writing to a shared state.
|
|
StateGraph = types.StateGraph
|
|
|
|
// CompiledGraph is a compiled, executable graph.
|
|
CompiledGraph = types.CompiledGraph
|
|
|
|
// Node represents a node in the graph.
|
|
Node = types.Node
|
|
|
|
// Edge represents an edge in the graph.
|
|
Edge = types.Edge
|
|
|
|
// Send represents a dynamic node invocation.
|
|
Send = types.Send
|
|
|
|
// Checkpointer is the interface for checkpoint savers.
|
|
Checkpointer = checkpoint.BaseCheckpointer
|
|
|
|
// MemorySaver is an in-memory checkpoint saver.
|
|
MemorySaver = checkpoint.MemorySaver
|
|
|
|
// NATSSaver is a NATS JetStream-based checkpoint saver.
|
|
NATSSaver = checkpoint.NATSSaver
|
|
// NATSConfig holds configuration for the NATS checkpoint saver.
|
|
NATSConfig = checkpoint.NATSConfig
|
|
|
|
// Channel is the base interface for all channels.
|
|
Channel = channels.Channel
|
|
|
|
// BaseChannel provides a base implementation of Channel.
|
|
BaseChannel = channels.BaseChannel
|
|
|
|
// LastValue stores the last value received.
|
|
LastValue = channels.LastValue
|
|
|
|
// Topic is a configurable PubSub Topic.
|
|
Topic = channels.Topic
|
|
|
|
// BinaryOperatorAggregate stores the result of applying a binary operator.
|
|
BinaryOperatorAggregate = channels.BinaryOperatorAggregate
|
|
|
|
// BinaryOperator is a function that combines two values into one.
|
|
BinaryOperator = channels.BinaryOperator
|
|
|
|
// EphemeralValue stores a value that is cleared after being read once.
|
|
EphemeralValue = channels.EphemeralValue
|
|
|
|
// NamedBarrierValue waits until all named nodes have written a value.
|
|
NamedBarrierValue = channels.NamedBarrierValue
|
|
|
|
// NamedBarrierValueAfterFinish waits for all named nodes, available only after finish.
|
|
NamedBarrierValueAfterFinish = channels.NamedBarrierValueAfterFinish
|
|
|
|
// LastValueAfterFinish stores last value, available only after finish.
|
|
LastValueAfterFinish = channels.LastValueAfterFinish
|
|
|
|
// UntrackedValue stores a value but does not track it for checkpointing.
|
|
UntrackedValue = channels.UntrackedValue
|
|
|
|
// AnyValue stores any value received.
|
|
AnyValue = channels.AnyValue
|
|
|
|
// RunnableConfig is the configuration for a runnable.
|
|
RunnableConfig = types.RunnableConfig
|
|
|
|
// StreamMode defines how the stream method should emit outputs.
|
|
StreamMode = types.StreamMode
|
|
|
|
// RetryPolicy configures retrying nodes.
|
|
RetryPolicy = types.RetryPolicy
|
|
|
|
// CachePolicy configures caching nodes.
|
|
CachePolicy = types.CachePolicy
|
|
|
|
// Command is used to update the graph's state and send messages to nodes.
|
|
Command = types.Command
|
|
|
|
// Interrupt represents information about an interrupt.
|
|
Interrupt = types.Interrupt
|
|
|
|
// NodeFunc is the signature of a node function.
|
|
NodeFunc = types.NodeFunc
|
|
|
|
// EdgeFunc is the signature of an edge/condition function.
|
|
EdgeFunc = types.EdgeFunc
|
|
|
|
// StreamWriter writes data to the output stream.
|
|
StreamWriter = types.StreamWriter
|
|
|
|
// Prebuilt types
|
|
ReactAgentConfig = prebuilt.ReactAgentConfig
|
|
ReActState = prebuilt.ReActState
|
|
Tool = prebuilt.Tool
|
|
ToolCall = prebuilt.ToolCall
|
|
LLM = prebuilt.LLM
|
|
)
|
|
|
|
// AgentCore types (selectively re-exported).
|
|
// Generic types like Model[M] and RunnerConfig[M] must be imported directly.
|
|
type (
|
|
// Agent is the core agent interface (Message type).
|
|
Agent = core.Agent
|
|
// ResumableAgent supports interrupt/resume.
|
|
ResumableAgent = core.ResumableAgent
|
|
// Runner executes agents.
|
|
Runner = core.Runner
|
|
// AgentEvent represents an event during agent execution.
|
|
AgentEvent = core.AgentEvent
|
|
// AgentAction represents actions an agent can emit.
|
|
AgentAction = core.AgentAction
|
|
// AgentInput is the input to an agent.
|
|
AgentInput = core.AgentInput
|
|
// AgentOutput is the output from an agent event.
|
|
AgentOutput = core.AgentOutput
|
|
// RunOption configures agent execution.
|
|
RunOption = core.RunOption
|
|
// InterruptInfo holds interrupt metadata.
|
|
InterruptInfo = core.InterruptInfo
|
|
// InterruptCtx provides structured interrupt context.
|
|
InterruptCtx = core.InterruptCtx
|
|
// InterruptSignal is the internal interrupt signal.
|
|
InterruptSignal = core.InterruptSignal
|
|
// CancelMode defines when an agent should be canceled.
|
|
CancelMode = core.CancelMode
|
|
// CancelError indicates an agent was canceled.
|
|
CancelError = core.CancelError
|
|
// CancelHandle allows waiting for cancel completion.
|
|
CancelHandle = core.CancelHandle
|
|
// AgentCancelFunc cancels a running agent.
|
|
AgentCancelFunc = core.AgentCancelFunc
|
|
// BaseTool provides a simple Tool implementation.
|
|
BaseTool = core.BaseTool
|
|
// ToolContext provides tool metadata.
|
|
ToolContext = core.ToolContext
|
|
// ReActAgentState holds agent state for middlewares.
|
|
ReActAgentState = core.ReActAgentState
|
|
// ReActAgentContext is passed to BeforeAgent middlewares.
|
|
ReActAgentContext = core.ReActAgentContext
|
|
// ModelContext wraps model call context.
|
|
ModelContext = core.ModelContext
|
|
// CheckPointStore persists execution checkpoints.
|
|
CheckPointStore = core.CheckPointStore
|
|
// ReActMiddleware allows customizing agent behavior.
|
|
ReActMiddleware = core.ReActMiddleware
|
|
// Workflow types
|
|
SequentialConfig = core.SequentialConfig
|
|
ParallelConfig = core.ParallelConfig
|
|
LoopConfig = core.LoopConfig
|
|
)
|
|
|
|
// Cancel constants.
|
|
const (
|
|
CancelImmediate = core.CancelImmediate
|
|
CancelAfterChatModel = core.CancelAfterChatModel
|
|
CancelAfterToolCalls = core.CancelAfterToolCalls
|
|
)
|
|
|
|
// AgentCore functions.
|
|
var (
|
|
// NewRunner creates an agent Runner (Message type).
|
|
NewRunner = core.NewRunner
|
|
// NewAgentTool wraps an Agent as a Tool.
|
|
NewAgentTool = core.NewAgentTool
|
|
// NewSequential creates a sequential workflow agent.
|
|
NewSequential = core.NewSequential
|
|
// NewParallel creates a parallel workflow agent.
|
|
NewParallel = core.NewParallel
|
|
// NewLoop creates a loop workflow agent.
|
|
NewLoop = core.NewLoop
|
|
// SetSubAgents configures sub-agents.
|
|
SetSubAgents = core.SetSubAgents
|
|
// WithCancel creates a cancel option and cancel function.
|
|
WithCancel = core.WithCancel
|
|
|
|
// Run option constructors
|
|
WithSessionValues = core.WithSessionValues
|
|
WithCheckPointID = core.WithCheckPointID
|
|
WithSkipTransferMessages = core.WithSkipTransferMessages
|
|
WithCallbacks = core.WithCallbacks
|
|
WithAgentNames = core.WithAgentNames
|
|
WithSharedParentSession = core.WithSharedParentSession
|
|
WithChatModelOptions = core.WithChatModelOptions
|
|
WithToolOptions = core.WithToolOptions
|
|
WithAgentToolOptions = core.WithAgentToolOptions
|
|
WithHistoryModifier = core.WithHistoryModifier
|
|
WithCancelMode = core.WithCancelMode
|
|
WithCancelTimeout = core.WithCancelTimeout
|
|
WithRecursiveCancel = core.WithRecursiveCancel
|
|
|
|
// Event helpers
|
|
StatefulInterrupt = core.StatefulInterrupt
|
|
CompositeInterrupt = core.CompositeInterrupt
|
|
SendEvent = core.SendEvent
|
|
SetRunLocalValue = core.SetRunLocalValue
|
|
GetRunLocalValue = core.GetRunLocalValue
|
|
DeleteRunLocalValue = core.DeleteRunLocalValue
|
|
|
|
// Transfer and middleware
|
|
AgentWithOptions = core.AgentWithOptions
|
|
AgentWithDeterministicTransfer = core.AgentWithDeterministicTransfer
|
|
SetLanguage = core.SetLanguage
|
|
|
|
// Errors
|
|
ErrCancelTimeout = core.ErrCancelTimeout
|
|
ErrExecutionEnded = core.ErrExecutionEnded
|
|
ErrStreamCanceled = core.ErrStreamCanceled
|
|
)
|
|
|
|
// Prebuilt component functions.
|
|
var (
|
|
// NewReactAgent creates a new ReAct (Reasoning + Acting) agent.
|
|
NewReactAgent = prebuilt.NewReactAgent
|
|
// ToolNode creates a node that executes a tool.
|
|
ToolNode = prebuilt.ToolNode
|
|
// ValidationNode creates a node that validates input.
|
|
ValidationNode = prebuilt.ValidationNode
|
|
// ConditionalNode creates a node that routes based on a condition.
|
|
ConditionalNode = prebuilt.ConditionalNode
|
|
// TransformNode creates a node that transforms input.
|
|
TransformNode = prebuilt.TransformNode
|
|
)
|
|
|
|
// Re-export constants.
|
|
const (
|
|
// Start is the first (virtual) node in the graph.
|
|
Start = constants.Start
|
|
|
|
// End is the last (virtual) node in the graph.
|
|
End = constants.End
|
|
|
|
// TagNoStream is a tag to disable streaming.
|
|
TagNoStream = constants.TagNoStream
|
|
|
|
// TagHidden is a tag to hide a node/edge from tracing.
|
|
TagHidden = constants.TagHidden
|
|
)
|
|
|
|
// Re-export stream modes.
|
|
const (
|
|
StreamModeValues = types.StreamModeValues
|
|
StreamModeUpdates = types.StreamModeUpdates
|
|
StreamModeCustom = types.StreamModeCustom
|
|
StreamModeMessages = types.StreamModeMessages
|
|
StreamModeCheckpoints = types.StreamModeCheckpoints
|
|
StreamModeTasks = types.StreamModeTasks
|
|
StreamModeDebug = types.StreamModeDebug
|
|
)
|
|
|
|
// Re-export error types.
|
|
type (
|
|
GraphRecursionError = errors.GraphRecursionError
|
|
InvalidUpdateError = errors.InvalidUpdateError
|
|
GraphInterrupt = errors.GraphInterrupt
|
|
EmptyChannelError = errors.EmptyChannelError
|
|
EmptyInputError = errors.EmptyInputError
|
|
NodeNotFoundError = errors.NodeNotFoundError
|
|
InvalidNodeError = errors.InvalidNodeError
|
|
InvalidEdgeError = errors.InvalidEdgeError
|
|
ChannelNotFoundError = errors.ChannelNotFoundError
|
|
)
|
|
|
|
// NewStateGraph creates a new StateGraph with the given state schema.
|
|
func NewStateGraph(stateSchema interface{}) StateGraph {
|
|
return graph.NewStateGraph(stateSchema)
|
|
}
|
|
|
|
// NewMemorySaver creates a new in-memory checkpoint saver.
|
|
func NewMemorySaver() *MemorySaver {
|
|
return checkpoint.NewMemorySaver()
|
|
}
|
|
|
|
// Compile options.
|
|
var (
|
|
// WithCheckpointer sets the checkpointer for the compiled graph.
|
|
WithCheckpointer = graph.WithCheckpointer
|
|
|
|
// WithInterrupts sets the nodes that should trigger interrupts.
|
|
WithInterrupts = graph.WithInterrupts
|
|
|
|
// WithInterruptsAfter sets the nodes that should trigger interrupts AFTER execution.
|
|
WithInterruptsAfter = graph.WithInterruptsAfter
|
|
|
|
// WithRecursionLimit sets the recursion limit.
|
|
WithRecursionLimit = graph.WithRecursionLimit
|
|
|
|
// WithDebug enables debug mode.
|
|
WithDebug = graph.WithDebug
|
|
|
|
// WithNodeTriggerMode sets the node trigger mode (AnyPredecessor / AllPredecessor).
|
|
WithNodeTriggerMode = graph.WithNodeTriggerMode
|
|
)
|
|
|
|
// Loop / Parallel option constructors.
|
|
var (
|
|
// WithLoopMaxIterations sets the maximum number of iterations for a Loop node.
|
|
WithLoopMaxIterations = graph.WithLoopMaxIterations
|
|
// WithLoopCheckpointIDPrefix sets a stable checkpoint prefix for loop iterations.
|
|
WithLoopCheckpointIDPrefix = graph.WithLoopCheckpointIDPrefix
|
|
// WithParallelMaxConcurrency sets the maximum concurrency for a Parallel node.
|
|
WithParallelMaxConcurrency = graph.WithParallelMaxConcurrency
|
|
)
|
|
|
|
// Loop / Parallel node constructor functions.
|
|
var (
|
|
// NewLoopNodeFunc creates a NodeFunc that repeatedly executes a sub-graph.
|
|
NewLoopNodeFunc = graph.NewLoopNodeFunc
|
|
// NewParallelNodeFunc creates a NodeFunc that fans out items to a sub-graph.
|
|
NewParallelNodeFunc = graph.NewParallelNodeFunc
|
|
)
|
|
|
|
// LoopCondition is the per-iteration exit predicate for loop nodes.
|
|
type LoopCondition = graph.LoopCondition
|
|
|
|
// Interrupt functions.
|
|
var (
|
|
// InterruptFunc interrupts the graph with a resumable exception.
|
|
InterruptFunc = interrupt.Interrupt
|
|
|
|
// IsInterrupt checks if an error is a GraphInterrupt.
|
|
IsInterrupt = interrupt.IsInterrupt
|
|
|
|
// GetInterruptValue extracts the interrupt value from a GraphInterrupt error.
|
|
GetInterruptValue = interrupt.GetInterruptValue
|
|
)
|
|
|
|
// Channel constructors.
|
|
var (
|
|
// NewLastValue creates a new LastValue channel.
|
|
NewLastValue = channels.NewLastValue
|
|
|
|
// NewTopic creates a new Topic channel.
|
|
NewTopic = channels.NewTopic
|
|
|
|
// NewBinaryOperatorAggregate creates a new BinaryOperatorAggregate channel.
|
|
NewBinaryOperatorAggregate = channels.NewBinaryOperatorAggregate
|
|
|
|
// NewEphemeralValue creates a new EphemeralValue channel.
|
|
NewEphemeralValue = channels.NewEphemeralValue
|
|
|
|
// NewNamedBarrierValue creates a new NamedBarrierValue channel.
|
|
NewNamedBarrierValue = channels.NewNamedBarrierValue
|
|
|
|
// NewNamedBarrierValueAfterFinish creates a new NamedBarrierValueAfterFinish channel.
|
|
NewNamedBarrierValueAfterFinish = channels.NewNamedBarrierValueAfterFinish
|
|
|
|
// NewLastValueAfterFinish creates a new LastValueAfterFinish channel.
|
|
NewLastValueAfterFinish = channels.NewLastValueAfterFinish
|
|
|
|
// NewUntrackedValue creates a new UntrackedValue channel.
|
|
NewUntrackedValue = channels.NewUntrackedValue
|
|
|
|
// NewAnyValue creates a new AnyValue channel.
|
|
NewAnyValue = channels.NewAnyValue
|
|
)
|
|
|
|
// BinaryOperator functions.
|
|
var (
|
|
// ListAppend appends two lists.
|
|
ListAppend = channels.ListAppend
|
|
|
|
// IntAdd adds two integers.
|
|
IntAdd = channels.IntAdd
|
|
|
|
// StringConcat concatenates two strings.
|
|
StringConcat = channels.StringConcat
|
|
)
|
|
|
|
// DefaultRetryPolicy returns a default retry policy.
|
|
func DefaultRetryPolicy() RetryPolicy {
|
|
return types.DefaultRetryPolicy()
|
|
}
|
|
|
|
// NewRunnableConfig creates a new RunnableConfig.
|
|
func NewRunnableConfig() *RunnableConfig {
|
|
return types.NewRunnableConfig()
|
|
}
|
|
|
|
// NewCommand creates a new Command.
|
|
func NewCommand() *Command {
|
|
return types.NewCommand()
|
|
}
|
|
|
|
// NewSend creates a new graph.Send (used for map-reduce style Pregel operations).
|
|
// NOTE: This returns *graph.Send, which is distinct from *types.Send.
|
|
// Users of the types package should use types.NewSend directly.
|
|
func NewSend(node string, arg interface{}) *Send {
|
|
return &Send{Node: node, Arg: arg}
|
|
}
|
|
|
|
// Pregel engine wiring (init + runner) moved to
|
|
// internal/harness/graph/pregel/init_pregel.go so that any
|
|
// package importing pregel automatically activates it.
|