Files
ragflow/internal/harness/graph/pregel/cache_async.go
Yingfeng 706e0d2d06 Refactor harness framework (#16271)
### What problem does this PR solve?

- Tools management
- Pregel engine wrapper for better usage
- UT race
- Coding style

### Type of change

- [x] Refactoring
2026-06-23 20:18:04 +08:00

260 lines
6.0 KiB
Go

// Package pregel provides asynchronous caching support for Pregel execution.
package pregel
import (
"context"
"time"
)
// AsyncCache extends the Cache interface with asynchronous operations.
type AsyncCache interface {
Cache
// AGet asynchronously retrieves a value from the cache.
// Returns a channel that will receive the result.
AGet(ctx context.Context, key string) <-chan CacheResult
// ASet asynchronously stores a value in the cache.
// Returns a channel that will be closed when the operation completes.
ASet(ctx context.Context, key string, value any, ttl time.Duration) <-chan error
// ADelete asynchronously removes a value from the cache.
// Returns a channel that will be closed when the operation completes.
ADelete(ctx context.Context, key string) <-chan error
}
// CacheResult represents the result of an asynchronous cache get operation.
type CacheResult struct {
Value any
Found bool
Error error
}
// AsyncMemoryCache is an asynchronous in-memory cache implementation.
type AsyncMemoryCache struct {
*MemoryCache
workerCh chan asyncCacheOp
stopCh chan struct{}
}
// asyncCacheOp represents an asynchronous cache operation.
type asyncCacheOp struct {
ctx context.Context
opType string // "get", "set", "delete"
key string
value any
ttl time.Duration
result chan<- CacheResult
done chan<- error
}
// NewAsyncMemoryCache creates a new asynchronous in-memory cache.
func NewAsyncMemoryCache(maxSize int, eviction EvictionPolicy, numWorkers int) *AsyncMemoryCache {
if numWorkers <= 0 {
numWorkers = 4
}
cache := &AsyncMemoryCache{
MemoryCache: NewMemoryCache(maxSize, eviction),
workerCh: make(chan asyncCacheOp, 1000),
stopCh: make(chan struct{}),
}
// Start worker goroutines
for i := 0; i < numWorkers; i++ {
go cache.worker()
}
return cache
}
// worker processes asynchronous cache operations.
func (c *AsyncMemoryCache) worker() {
for {
select {
case op := <-c.workerCh:
c.processOp(op)
case <-c.stopCh:
return
}
}
}
// processOp processes a single cache operation.
func (c *AsyncMemoryCache) processOp(op asyncCacheOp) {
switch op.opType {
case "get":
value, found := c.MemoryCache.Get(op.ctx, op.key)
if op.result != nil {
op.result <- CacheResult{Value: value, Found: found}
}
case "set":
c.MemoryCache.Set(op.ctx, op.key, op.value, op.ttl)
if op.done != nil {
op.done <- nil
}
case "delete":
c.MemoryCache.Delete(op.ctx, op.key)
if op.done != nil {
op.done <- nil
}
}
}
// AGet asynchronously retrieves a value from the cache.
func (c *AsyncMemoryCache) AGet(ctx context.Context, key string) <-chan CacheResult {
resultCh := make(chan CacheResult, 1)
select {
case c.workerCh <- asyncCacheOp{
ctx: ctx,
opType: "get",
key: key,
result: resultCh,
}:
case <-ctx.Done():
resultCh <- CacheResult{Error: ctx.Err()}
close(resultCh)
}
return resultCh
}
// ASet asynchronously stores a value in the cache.
func (c *AsyncMemoryCache) ASet(ctx context.Context, key string, value any, ttl time.Duration) <-chan error {
doneCh := make(chan error, 1)
select {
case c.workerCh <- asyncCacheOp{
ctx: ctx,
opType: "set",
key: key,
value: value,
ttl: ttl,
done: doneCh,
}:
case <-ctx.Done():
doneCh <- ctx.Err()
close(doneCh)
}
return doneCh
}
// ADelete asynchronously removes a value from the cache.
func (c *AsyncMemoryCache) ADelete(ctx context.Context, key string) <-chan error {
doneCh := make(chan error, 1)
select {
case c.workerCh <- asyncCacheOp{
ctx: ctx,
opType: "delete",
key: key,
done: doneCh,
}:
case <-ctx.Done():
doneCh <- ctx.Err()
close(doneCh)
}
return doneCh
}
// Stop stops the async cache workers.
func (c *AsyncMemoryCache) Stop() {
close(c.stopCh)
}
// AsyncCachePolicy configures async cache behavior.
type AsyncCachePolicy struct {
// KeyFunc generates the cache key.
KeyFunc func(context.Context, any) string
// TTL is the time-to-live for cached values.
TTL *time.Duration
// Async determines if operations should be async.
Async bool
}
// AsyncCachedExecutor wraps a function with async caching.
type AsyncCachedExecutor struct {
cache AsyncCache
cachePolicy *AsyncCachePolicy
}
// NewAsyncCachedExecutor creates a new async cached executor.
func NewAsyncCachedExecutor(cache AsyncCache, policy *AsyncCachePolicy) *AsyncCachedExecutor {
return &AsyncCachedExecutor{
cache: cache,
cachePolicy: policy,
}
}
// Execute executes a function with async caching.
func (e *AsyncCachedExecutor) Execute(
ctx context.Context,
nodeName string,
input any,
fn func(context.Context, any) (any, error),
) (any, error) {
// Generate cache key
var key string
if e.cachePolicy != nil && e.cachePolicy.KeyFunc != nil {
key = e.cachePolicy.KeyFunc(ctx, input)
} else {
key = GenerateCacheKey(nodeName, input)
}
// Check cache asynchronously
if e.cachePolicy != nil && e.cachePolicy.Async {
resultCh := e.cache.AGet(ctx, key)
select {
case result := <-resultCh:
if result.Error != nil {
return nil, result.Error
}
if result.Found {
return result.Value, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
} else {
// Synchronous fallback
if cached, ok := e.cache.Get(ctx, key); ok {
return cached, nil
}
}
// Execute function
result, err := fn(ctx, input)
if err != nil {
return nil, err
}
// Cache result asynchronously
var ttl time.Duration
if e.cachePolicy != nil && e.cachePolicy.TTL != nil {
ttl = *e.cachePolicy.TTL
}
if e.cachePolicy != nil && e.cachePolicy.Async {
// Fire and forget async set
e.cache.ASet(context.Background(), key, result, ttl)
} else {
e.cache.Set(ctx, key, result, ttl)
}
return result, nil
}
// WaitForPending waits for all pending async operations to complete.
func (e *AsyncCachedExecutor) WaitForPending(timeout time.Duration) bool {
// In a real implementation, this would track pending operations
// For now, just sleep briefly to allow operations to complete
time.Sleep(timeout)
return true
}