Files
ragflow/internal/harness/core/agent_loop_run.go
Yingfeng 956357b997 Feat: add harness-go framework —— agent core (#16045)
### What problem does this PR solve?

core module for agent layer built on top of graph engine #16039

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
2026-06-16 11:39:48 +08:00

261 lines
5.4 KiB
Go

package core
import (
"context"
"errors"
"sync/atomic"
"time"
)
// ---- AgentLoop main run loop and turn planning ----
func (l *AgentLoop[T]) planTurn(
ctx context.Context,
isResume bool,
items []T,
pr *agentLoopPendingResume[T],
) (*turnPlan[T], error) {
if !isResume {
result, err := l.config.GenInput(ctx, l, items)
if err != nil {
return nil, err
}
if result == nil {
return nil, errors.New("GenInputResult is nil")
}
if result.Input == nil {
return nil, errors.New("agent input is nil")
}
turnCtx := ctx
if result.RunCtx != nil {
turnCtx = result.RunCtx
}
return &turnPlan[T]{
turnCtx: turnCtx,
remaining: result.Remaining,
spec: &turnRunSpec[T]{
runCtx: result.RunCtx,
input: result.Input,
runOpts: result.RunOpts,
consumed: result.Consumed,
},
}, nil
}
if pr == nil {
return nil, errors.New("resume payload is nil")
}
if l.config.GenResume == nil {
return nil, errors.New("GenResume is required for resume")
}
resumeResult, err := l.config.GenResume(ctx, l, pr.interrupted, pr.unhandled, pr.newItems)
if err != nil {
return nil, err
}
if resumeResult == nil {
return nil, errors.New("GenResumeResult is nil")
}
turnCtx := ctx
if resumeResult.RunCtx != nil {
turnCtx = resumeResult.RunCtx
}
return &turnPlan[T]{
turnCtx: turnCtx,
remaining: resumeResult.Remaining,
spec: &turnRunSpec[T]{
runCtx: resumeResult.RunCtx,
runOpts: resumeResult.RunOpts,
resumeParams: resumeResult.ResumeParams,
isResume: true,
consumed: resumeResult.Consumed,
resumeBytes: pr.resumeBytes,
},
}, nil
}
func defaultTurnLoopOnAgentEvents[T any](_ context.Context, _ *TurnContext[T], events *AsyncIterator[*AgentEvent]) error {
for {
event, ok := events.Next()
if !ok {
break
}
if event.Err != nil {
return event.Err
}
}
return nil
}
func (l *AgentLoop[T]) run(ctx context.Context) {
defer l.cleanup(ctx)
if err := l.tryLoadCheckpoint(ctx); err != nil {
l.runErr = err
return
}
// Monitor context cancellation: close the buffer so that a blocking
// Receive() unblocks.
go func() {
select {
case <-ctx.Done():
l.buffer.Close()
case <-l.done:
}
}()
for {
if l.stopCtrl.isCommitted() {
return
}
isResume := false
var pr *agentLoopPendingResume[T]
var items []T
var pushBack []T
if l.pendingResume != nil {
isResume = true
pr = l.pendingResume
l.pendingResume = nil
l.preemptCtrl.waitForPushes()
pr.newItems = append(pr.newItems, l.buffer.TakeAll()...)
pushBack = make([]T, 0, len(pr.interrupted)+len(pr.unhandled)+len(pr.newItems))
pushBack = append(pushBack, pr.interrupted...)
pushBack = append(pushBack, pr.unhandled...)
pushBack = append(pushBack, pr.newItems...)
} else {
var first T
var ok bool
if idleFor := l.stopCtrl.idleDuration(); idleFor > 0 {
l.buffer.ClearWakeup()
idleTimer := time.NewTimer(idleFor)
cancelIdle := make(chan struct{})
go func() {
select {
case <-idleTimer.C:
l.commitStop()
case <-cancelIdle:
}
}()
first, ok = l.buffer.Receive()
// Drain the timer channel to avoid race with commitStop
if !idleTimer.Stop() {
select {
case <-idleTimer.C:
default:
}
}
close(cancelIdle)
if !ok && !l.buffer.IsClosed() {
if err := ctx.Err(); err != nil {
l.runErr = err
return
}
continue
}
// If commitStop fired via idle timer, exit
if atomic.LoadInt32(&l.stopped) != 0 {
return
}
} else {
first, ok = l.buffer.Receive()
if !ok && l.stopCtrl.idleDuration() > 0 {
continue
}
}
if !ok {
if err := ctx.Err(); err != nil {
l.runErr = err
}
return
}
if err := ctx.Err(); err != nil {
l.buffer.PushFront([]T{first})
l.runErr = err
return
}
if l.stopCtrl.isCommitted() {
l.buffer.PushFront([]T{first})
return
}
l.preemptCtrl.waitForPushes()
rest := l.buffer.TakeAll()
items = append([]T{first}, rest...)
pushBack = items
}
l.preemptCtrl.beginPlanningTurn()
abortPlanning := func() {
l.preemptCtrl.abortPlanningTurn().ack()
}
plan, err := l.planTurn(ctx, isResume, items, pr)
if err != nil {
abortPlanning()
if len(pushBack) > 0 {
l.buffer.PushFront(pushBack)
}
l.runErr = err
return
}
if l.stopCtrl.isCommitted() {
abortPlanning()
if len(pushBack) > 0 {
l.buffer.PushFront(pushBack)
}
return
}
agent, err := l.config.PrepareAgent(plan.turnCtx, l, plan.spec.consumed)
if err != nil {
abortPlanning()
if len(pushBack) > 0 {
l.buffer.PushFront(pushBack)
}
l.runErr = err
return
}
if l.stopCtrl.isCommitted() {
abortPlanning()
if len(pushBack) > 0 {
l.buffer.PushFront(pushBack)
}
return
}
l.buffer.PushFront(plan.remaining)
runErr := l.runAgentAndHandleEvents(plan.turnCtx, agent, plan.spec)
if runErr != nil {
if l.capturedCancelErr != nil || l.interruptContexts != nil {
// Assignment (not append) is intentional: only the interrupting
// turn's consumed items matter — the loop exits immediately after.
l.interruptedItems = append([]T{}, plan.spec.consumed...)
}
l.runErr = runErr
return
}
// Business interrupt: agent produced an Interrupted action
if l.interruptContexts != nil {
l.interruptedItems = append([]T{}, plan.spec.consumed...)
l.runErr = &InterruptError{InterruptContexts: l.interruptContexts}
return
}
}
}