mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-30 07:51:10 +08:00
### What problem does this PR solve? core module for agent layer built on top of graph engine #16039 ### Type of change - [x] New Feature (non-breaking change which adds functionality)
261 lines
5.4 KiB
Go
261 lines
5.4 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// ---- AgentLoop main run loop and turn planning ----
|
|
|
|
func (l *AgentLoop[T]) planTurn(
|
|
ctx context.Context,
|
|
isResume bool,
|
|
items []T,
|
|
pr *agentLoopPendingResume[T],
|
|
) (*turnPlan[T], error) {
|
|
if !isResume {
|
|
result, err := l.config.GenInput(ctx, l, items)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result == nil {
|
|
return nil, errors.New("GenInputResult is nil")
|
|
}
|
|
if result.Input == nil {
|
|
return nil, errors.New("agent input is nil")
|
|
}
|
|
turnCtx := ctx
|
|
if result.RunCtx != nil {
|
|
turnCtx = result.RunCtx
|
|
}
|
|
return &turnPlan[T]{
|
|
turnCtx: turnCtx,
|
|
remaining: result.Remaining,
|
|
spec: &turnRunSpec[T]{
|
|
runCtx: result.RunCtx,
|
|
input: result.Input,
|
|
runOpts: result.RunOpts,
|
|
consumed: result.Consumed,
|
|
},
|
|
}, nil
|
|
}
|
|
if pr == nil {
|
|
return nil, errors.New("resume payload is nil")
|
|
}
|
|
if l.config.GenResume == nil {
|
|
return nil, errors.New("GenResume is required for resume")
|
|
}
|
|
resumeResult, err := l.config.GenResume(ctx, l, pr.interrupted, pr.unhandled, pr.newItems)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if resumeResult == nil {
|
|
return nil, errors.New("GenResumeResult is nil")
|
|
}
|
|
turnCtx := ctx
|
|
if resumeResult.RunCtx != nil {
|
|
turnCtx = resumeResult.RunCtx
|
|
}
|
|
return &turnPlan[T]{
|
|
turnCtx: turnCtx,
|
|
remaining: resumeResult.Remaining,
|
|
spec: &turnRunSpec[T]{
|
|
runCtx: resumeResult.RunCtx,
|
|
runOpts: resumeResult.RunOpts,
|
|
resumeParams: resumeResult.ResumeParams,
|
|
isResume: true,
|
|
consumed: resumeResult.Consumed,
|
|
resumeBytes: pr.resumeBytes,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func defaultTurnLoopOnAgentEvents[T any](_ context.Context, _ *TurnContext[T], events *AsyncIterator[*AgentEvent]) error {
|
|
for {
|
|
event, ok := events.Next()
|
|
if !ok {
|
|
break
|
|
}
|
|
if event.Err != nil {
|
|
return event.Err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (l *AgentLoop[T]) run(ctx context.Context) {
|
|
defer l.cleanup(ctx)
|
|
|
|
if err := l.tryLoadCheckpoint(ctx); err != nil {
|
|
l.runErr = err
|
|
return
|
|
}
|
|
|
|
// Monitor context cancellation: close the buffer so that a blocking
|
|
// Receive() unblocks.
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
l.buffer.Close()
|
|
case <-l.done:
|
|
}
|
|
}()
|
|
|
|
for {
|
|
if l.stopCtrl.isCommitted() {
|
|
return
|
|
}
|
|
|
|
isResume := false
|
|
var pr *agentLoopPendingResume[T]
|
|
var items []T
|
|
var pushBack []T
|
|
|
|
if l.pendingResume != nil {
|
|
isResume = true
|
|
pr = l.pendingResume
|
|
l.pendingResume = nil
|
|
|
|
l.preemptCtrl.waitForPushes()
|
|
pr.newItems = append(pr.newItems, l.buffer.TakeAll()...)
|
|
|
|
pushBack = make([]T, 0, len(pr.interrupted)+len(pr.unhandled)+len(pr.newItems))
|
|
pushBack = append(pushBack, pr.interrupted...)
|
|
pushBack = append(pushBack, pr.unhandled...)
|
|
pushBack = append(pushBack, pr.newItems...)
|
|
} else {
|
|
var first T
|
|
var ok bool
|
|
|
|
if idleFor := l.stopCtrl.idleDuration(); idleFor > 0 {
|
|
l.buffer.ClearWakeup()
|
|
idleTimer := time.NewTimer(idleFor)
|
|
cancelIdle := make(chan struct{})
|
|
go func() {
|
|
select {
|
|
case <-idleTimer.C:
|
|
l.commitStop()
|
|
case <-cancelIdle:
|
|
}
|
|
}()
|
|
|
|
first, ok = l.buffer.Receive()
|
|
|
|
// Drain the timer channel to avoid race with commitStop
|
|
if !idleTimer.Stop() {
|
|
select {
|
|
case <-idleTimer.C:
|
|
default:
|
|
}
|
|
}
|
|
close(cancelIdle)
|
|
|
|
if !ok && !l.buffer.IsClosed() {
|
|
if err := ctx.Err(); err != nil {
|
|
l.runErr = err
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
|
|
// If commitStop fired via idle timer, exit
|
|
if atomic.LoadInt32(&l.stopped) != 0 {
|
|
return
|
|
}
|
|
} else {
|
|
first, ok = l.buffer.Receive()
|
|
if !ok && l.stopCtrl.idleDuration() > 0 {
|
|
continue
|
|
}
|
|
}
|
|
|
|
if !ok {
|
|
if err := ctx.Err(); err != nil {
|
|
l.runErr = err
|
|
}
|
|
return
|
|
}
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
l.buffer.PushFront([]T{first})
|
|
l.runErr = err
|
|
return
|
|
}
|
|
|
|
if l.stopCtrl.isCommitted() {
|
|
l.buffer.PushFront([]T{first})
|
|
return
|
|
}
|
|
|
|
l.preemptCtrl.waitForPushes()
|
|
rest := l.buffer.TakeAll()
|
|
items = append([]T{first}, rest...)
|
|
pushBack = items
|
|
}
|
|
|
|
l.preemptCtrl.beginPlanningTurn()
|
|
abortPlanning := func() {
|
|
l.preemptCtrl.abortPlanningTurn().ack()
|
|
}
|
|
|
|
plan, err := l.planTurn(ctx, isResume, items, pr)
|
|
if err != nil {
|
|
abortPlanning()
|
|
if len(pushBack) > 0 {
|
|
l.buffer.PushFront(pushBack)
|
|
}
|
|
l.runErr = err
|
|
return
|
|
}
|
|
|
|
if l.stopCtrl.isCommitted() {
|
|
abortPlanning()
|
|
if len(pushBack) > 0 {
|
|
l.buffer.PushFront(pushBack)
|
|
}
|
|
return
|
|
}
|
|
|
|
agent, err := l.config.PrepareAgent(plan.turnCtx, l, plan.spec.consumed)
|
|
if err != nil {
|
|
abortPlanning()
|
|
if len(pushBack) > 0 {
|
|
l.buffer.PushFront(pushBack)
|
|
}
|
|
l.runErr = err
|
|
return
|
|
}
|
|
|
|
if l.stopCtrl.isCommitted() {
|
|
abortPlanning()
|
|
if len(pushBack) > 0 {
|
|
l.buffer.PushFront(pushBack)
|
|
}
|
|
return
|
|
}
|
|
|
|
l.buffer.PushFront(plan.remaining)
|
|
|
|
runErr := l.runAgentAndHandleEvents(plan.turnCtx, agent, plan.spec)
|
|
|
|
if runErr != nil {
|
|
if l.capturedCancelErr != nil || l.interruptContexts != nil {
|
|
// Assignment (not append) is intentional: only the interrupting
|
|
// turn's consumed items matter — the loop exits immediately after.
|
|
l.interruptedItems = append([]T{}, plan.spec.consumed...)
|
|
}
|
|
l.runErr = runErr
|
|
return
|
|
}
|
|
|
|
// Business interrupt: agent produced an Interrupted action
|
|
if l.interruptContexts != nil {
|
|
l.interruptedItems = append([]T{}, plan.spec.consumed...)
|
|
l.runErr = &InterruptError{InterruptContexts: l.interruptContexts}
|
|
return
|
|
}
|
|
}
|
|
}
|