Files
ragflow/internal/harness/core/agent_loop_preempt.go
Yingfeng 956357b997 Feat: add harness-go framework —— agent core (#16045)
### What problem does this PR solve?

core module for agent layer built on top of graph engine #16039

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
2026-06-16 11:39:48 +08:00

188 lines
3.9 KiB
Go

package core
import (
"context"
"fmt"
"sync"
"time"
)
// preemptController owns turn-targeted preempt requests and Push critical sections.
type preemptController struct {
mu sync.Mutex
cond *sync.Cond
turnPhase preemptTurnPhase
turnID uint64
currentTC any
currentRunCtx context.Context
pushInFlight int
pending *preemptRequest
notify chan struct{}
closed bool
}
func newPreemptController() *preemptController {
c := &preemptController{notify: make(chan struct{}, 1)}
c.cond = sync.NewCond(&c.mu)
return c
}
func (c *preemptController) beginPlanningTurn() {
c.mu.Lock()
defer c.mu.Unlock()
c.requirePhaseLocked(preemptTurnIdle, "beginPlanningTurn")
c.requireNoPendingLocked("beginPlanningTurn")
c.turnID++
c.turnPhase = preemptTurnPlanning
c.currentRunCtx = nil
c.currentTC = nil
}
func (c *preemptController) abortPlanningTurn() *preemptRequest {
c.mu.Lock()
defer c.mu.Unlock()
c.requirePhaseLocked(preemptTurnPlanning, "abortPlanningTurn")
c.turnPhase = preemptTurnIdle
c.currentRunCtx = nil
c.currentTC = nil
req := c.pending
c.pending = nil
c.cond.Broadcast()
return req
}
func (c *preemptController) beginActiveTurn(ctx context.Context, tc any) {
c.mu.Lock()
defer c.mu.Unlock()
c.requirePhaseLocked(preemptTurnPlanning, "beginActiveTurn")
c.turnPhase = preemptTurnActive
c.currentRunCtx = ctx
c.currentTC = tc
if c.pending != nil {
c.notifyWatcherLocked()
}
}
func (c *preemptController) endActiveTurn() *preemptRequest {
c.mu.Lock()
defer c.mu.Unlock()
c.requirePhaseLocked(preemptTurnActive, "endActiveTurn")
c.turnPhase = preemptTurnIdle
c.currentRunCtx = nil
c.currentTC = nil
req := c.pending
c.pending = nil
c.cond.Broadcast()
return req
}
func (c *preemptController) requirePhaseLocked(expected preemptTurnPhase, op string) {
if c.turnPhase != expected {
panic(fmt.Sprintf("adk: preemptController.%s called while turn phase is %s; expected %s", op, c.turnPhase, expected))
}
}
func (c *preemptController) requireNoPendingLocked(op string) {
if c.pending != nil {
panic(fmt.Sprintf("adk: preemptController.%s called with stale pending preempt request", op))
}
}
func (c *preemptController) beginPush() preemptTurnSnapshot {
c.mu.Lock()
defer c.mu.Unlock()
c.pushInFlight++
return preemptTurnSnapshot{
hasTargetTurn: c.turnPhase == preemptTurnPlanning || c.turnPhase == preemptTurnActive,
turnID: c.turnID,
ctx: c.currentRunCtx,
tc: c.currentTC,
}
}
func (c *preemptController) endPush() {
c.mu.Lock()
defer c.mu.Unlock()
c.pushInFlight--
if c.pushInFlight < 0 {
panic("adk: preemptController.endPush called without matching beginPush")
}
c.cond.Broadcast()
}
func (c *preemptController) waitForPushes() {
c.mu.Lock()
defer c.mu.Unlock()
for c.pushInFlight > 0 {
c.cond.Wait()
}
}
func (c *preemptController) requestPreempt(target preemptTurnSnapshot, ack chan struct{}, opts ...CancelOption) {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed || !target.hasTargetTurn || c.turnPhase == preemptTurnIdle || c.turnID != target.turnID {
if ack != nil {
close(ack)
}
return
}
now := time.Now()
if c.pending == nil {
c.pending = newPreemptRequest(ack, opts, now)
} else {
c.pending.merge(ack, opts, now)
}
if c.turnPhase == preemptTurnActive {
c.notifyWatcherLocked()
}
}
func (c *preemptController) receivePreempt() (*preemptRequest, bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.turnPhase != preemptTurnActive || c.pending == nil {
return nil, false
}
req := c.pending
c.pending = nil
return req, true
}
func (c *preemptController) closeForLoopExit() {
c.mu.Lock()
c.closed = true
c.turnPhase = preemptTurnIdle
c.currentRunCtx = nil
c.currentTC = nil
req := c.pending
c.pending = nil
select {
case <-c.notify:
default:
}
c.cond.Broadcast()
c.mu.Unlock()
req.ack()
}
func (c *preemptController) notifyWatcherLocked() {
select {
case c.notify <- struct{}{}:
default:
}
}