Files
ragflow/internal/harness/core/agent_loop_push.go
Yingfeng 956357b997 Feat: add harness-go framework —— agent core (#16045)
### What problem does this PR solve?

core module for agent layer built on top of graph engine #16039

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
2026-06-16 11:39:48 +08:00

144 lines
3.1 KiB
Go

package core
import (
"context"
"sync/atomic"
"time"
)
// ---- AgentLoop push operations ----
func (l *AgentLoop[T]) appendLate(item T) {
l.lateMu.Lock()
defer l.lateMu.Unlock()
if l.lateSealed {
panic("AgentLoop: Push called after TakeLateItems")
}
l.lateItems = append(l.lateItems, item)
}
// Push adds an item to the loop's buffer for processing.
// Returns false if the loop has stopped. When preemptive, returns an ack channel.
func (l *AgentLoop[T]) Push(item T, opts ...PushOption[T]) (bool, <-chan struct{}) {
cfg := &pushConfig[T]{}
for _, opt := range opts {
opt(cfg)
}
if cfg.pushStrategy != nil {
return l.pushWithStrategy(item, cfg)
}
return l.pushWithConfig(item, cfg)
}
// pushWithStrategy snapshots the current target turn while the strategy decides
// how to enqueue the item.
//
// When the loop is idle (no active turn), snapshot.ctx is nil and the strategy
// receives context.TODO() — it cannot observe caller cancellation or deadlines
// at that point. If the strategy needs the caller's context, use the Push overload
// that accepts ctx (not yet available; pass via closure instead).
func (l *AgentLoop[T]) pushWithStrategy(item T, cfg *pushConfig[T]) (bool, <-chan struct{}) {
strategy := cfg.pushStrategy
snapshot := l.preemptCtrl.beginPush()
defer l.preemptCtrl.endPush()
runCtx := snapshot.ctx
if runCtx == nil {
runCtx = context.TODO()
}
var tc *TurnContext[T]
if snapshot.tc != nil {
tc = snapshot.tc.(*TurnContext[T])
}
realOpts := strategy(runCtx, tc)
cfg = &pushConfig[T]{}
for _, opt := range realOpts {
opt(cfg)
}
cfg.pushStrategy = nil
if !cfg.preempt {
if !l.buffer.TrySend(item) {
l.appendLate(item)
return false, nil
}
return true, nil
}
if atomic.LoadInt32(&l.stopped) != 0 {
l.appendLate(item)
return false, nil
}
if !l.buffer.TrySend(item) {
l.appendLate(item)
return false, nil
}
ack := make(chan struct{})
if atomic.LoadInt32(&l.started) == 0 {
close(ack)
return true, ack
}
if cfg.preemptDelay > 0 {
go func() {
select {
case <-time.After(cfg.preemptDelay):
l.preemptCtrl.requestPreempt(snapshot, ack, cfg.agentCancelOpts...)
case <-l.done:
close(ack)
}
}()
} else {
l.preemptCtrl.requestPreempt(snapshot, ack, cfg.agentCancelOpts...)
}
return true, ack
}
func (l *AgentLoop[T]) pushWithConfig(item T, cfg *pushConfig[T]) (bool, <-chan struct{}) {
if atomic.LoadInt32(&l.stopped) != 0 {
l.appendLate(item)
return false, nil
}
if cfg.preempt {
snapshot := l.preemptCtrl.beginPush()
defer l.preemptCtrl.endPush()
if !l.buffer.TrySend(item) {
l.appendLate(item)
return false, nil
}
ack := make(chan struct{})
if atomic.LoadInt32(&l.started) == 0 {
close(ack)
return true, ack
}
if cfg.preemptDelay > 0 {
go func() {
select {
case <-time.After(cfg.preemptDelay):
l.preemptCtrl.requestPreempt(snapshot, ack, cfg.agentCancelOpts...)
case <-l.done:
close(ack)
}
}()
} else {
l.preemptCtrl.requestPreempt(snapshot, ack, cfg.agentCancelOpts...)
}
return true, ack
}
if !l.buffer.TrySend(item) {
l.appendLate(item)
return false, nil
}
return true, nil
}