mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-30 07:51:10 +08:00
### What problem does this PR solve? core module for agent layer built on top of graph engine #16039 ### Type of change - [x] New Feature (non-breaking change which adds functionality)
144 lines
3.1 KiB
Go
144 lines
3.1 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// ---- AgentLoop push operations ----
|
|
|
|
func (l *AgentLoop[T]) appendLate(item T) {
|
|
l.lateMu.Lock()
|
|
defer l.lateMu.Unlock()
|
|
if l.lateSealed {
|
|
panic("AgentLoop: Push called after TakeLateItems")
|
|
}
|
|
l.lateItems = append(l.lateItems, item)
|
|
}
|
|
|
|
// Push adds an item to the loop's buffer for processing.
|
|
// Returns false if the loop has stopped. When preemptive, returns an ack channel.
|
|
func (l *AgentLoop[T]) Push(item T, opts ...PushOption[T]) (bool, <-chan struct{}) {
|
|
cfg := &pushConfig[T]{}
|
|
for _, opt := range opts {
|
|
opt(cfg)
|
|
}
|
|
|
|
if cfg.pushStrategy != nil {
|
|
return l.pushWithStrategy(item, cfg)
|
|
}
|
|
|
|
return l.pushWithConfig(item, cfg)
|
|
}
|
|
|
|
// pushWithStrategy snapshots the current target turn while the strategy decides
|
|
// how to enqueue the item.
|
|
//
|
|
// When the loop is idle (no active turn), snapshot.ctx is nil and the strategy
|
|
// receives context.TODO() — it cannot observe caller cancellation or deadlines
|
|
// at that point. If the strategy needs the caller's context, use the Push overload
|
|
// that accepts ctx (not yet available; pass via closure instead).
|
|
func (l *AgentLoop[T]) pushWithStrategy(item T, cfg *pushConfig[T]) (bool, <-chan struct{}) {
|
|
strategy := cfg.pushStrategy
|
|
|
|
snapshot := l.preemptCtrl.beginPush()
|
|
defer l.preemptCtrl.endPush()
|
|
|
|
runCtx := snapshot.ctx
|
|
if runCtx == nil {
|
|
runCtx = context.TODO()
|
|
}
|
|
var tc *TurnContext[T]
|
|
if snapshot.tc != nil {
|
|
tc = snapshot.tc.(*TurnContext[T])
|
|
}
|
|
realOpts := strategy(runCtx, tc)
|
|
cfg = &pushConfig[T]{}
|
|
for _, opt := range realOpts {
|
|
opt(cfg)
|
|
}
|
|
cfg.pushStrategy = nil
|
|
|
|
if !cfg.preempt {
|
|
if !l.buffer.TrySend(item) {
|
|
l.appendLate(item)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
if atomic.LoadInt32(&l.stopped) != 0 {
|
|
l.appendLate(item)
|
|
return false, nil
|
|
}
|
|
|
|
if !l.buffer.TrySend(item) {
|
|
l.appendLate(item)
|
|
return false, nil
|
|
}
|
|
|
|
ack := make(chan struct{})
|
|
if atomic.LoadInt32(&l.started) == 0 {
|
|
close(ack)
|
|
return true, ack
|
|
}
|
|
|
|
if cfg.preemptDelay > 0 {
|
|
go func() {
|
|
select {
|
|
case <-time.After(cfg.preemptDelay):
|
|
l.preemptCtrl.requestPreempt(snapshot, ack, cfg.agentCancelOpts...)
|
|
case <-l.done:
|
|
close(ack)
|
|
}
|
|
}()
|
|
} else {
|
|
l.preemptCtrl.requestPreempt(snapshot, ack, cfg.agentCancelOpts...)
|
|
}
|
|
return true, ack
|
|
}
|
|
|
|
func (l *AgentLoop[T]) pushWithConfig(item T, cfg *pushConfig[T]) (bool, <-chan struct{}) {
|
|
if atomic.LoadInt32(&l.stopped) != 0 {
|
|
l.appendLate(item)
|
|
return false, nil
|
|
}
|
|
|
|
if cfg.preempt {
|
|
snapshot := l.preemptCtrl.beginPush()
|
|
defer l.preemptCtrl.endPush()
|
|
|
|
if !l.buffer.TrySend(item) {
|
|
l.appendLate(item)
|
|
return false, nil
|
|
}
|
|
|
|
ack := make(chan struct{})
|
|
if atomic.LoadInt32(&l.started) == 0 {
|
|
close(ack)
|
|
return true, ack
|
|
}
|
|
|
|
if cfg.preemptDelay > 0 {
|
|
go func() {
|
|
select {
|
|
case <-time.After(cfg.preemptDelay):
|
|
l.preemptCtrl.requestPreempt(snapshot, ack, cfg.agentCancelOpts...)
|
|
case <-l.done:
|
|
close(ack)
|
|
}
|
|
}()
|
|
} else {
|
|
l.preemptCtrl.requestPreempt(snapshot, ack, cfg.agentCancelOpts...)
|
|
}
|
|
return true, ack
|
|
}
|
|
|
|
if !l.buffer.TrySend(item) {
|
|
l.appendLate(item)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
}
|