mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-05 02:55:48 +08:00
### What problem does this PR solve? - Tools management - Pregel engine wrapper for better usage - UT race - Coding style ### Type of change - [x] Refactoring
260 lines
6.0 KiB
Go
260 lines
6.0 KiB
Go
// Package pregel provides asynchronous caching support for Pregel execution.
|
|
package pregel
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
)
|
|
|
|
// AsyncCache extends the Cache interface with asynchronous operations.
|
|
type AsyncCache interface {
|
|
Cache
|
|
|
|
// AGet asynchronously retrieves a value from the cache.
|
|
// Returns a channel that will receive the result.
|
|
AGet(ctx context.Context, key string) <-chan CacheResult
|
|
|
|
// ASet asynchronously stores a value in the cache.
|
|
// Returns a channel that will be closed when the operation completes.
|
|
ASet(ctx context.Context, key string, value any, ttl time.Duration) <-chan error
|
|
|
|
// ADelete asynchronously removes a value from the cache.
|
|
// Returns a channel that will be closed when the operation completes.
|
|
ADelete(ctx context.Context, key string) <-chan error
|
|
}
|
|
|
|
// CacheResult represents the result of an asynchronous cache get operation.
|
|
type CacheResult struct {
|
|
Value any
|
|
Found bool
|
|
Error error
|
|
}
|
|
|
|
// AsyncMemoryCache is an asynchronous in-memory cache implementation.
|
|
type AsyncMemoryCache struct {
|
|
*MemoryCache
|
|
workerCh chan asyncCacheOp
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
// asyncCacheOp represents an asynchronous cache operation.
|
|
type asyncCacheOp struct {
|
|
ctx context.Context
|
|
opType string // "get", "set", "delete"
|
|
key string
|
|
value any
|
|
ttl time.Duration
|
|
result chan<- CacheResult
|
|
done chan<- error
|
|
}
|
|
|
|
// NewAsyncMemoryCache creates a new asynchronous in-memory cache.
|
|
func NewAsyncMemoryCache(maxSize int, eviction EvictionPolicy, numWorkers int) *AsyncMemoryCache {
|
|
if numWorkers <= 0 {
|
|
numWorkers = 4
|
|
}
|
|
|
|
cache := &AsyncMemoryCache{
|
|
MemoryCache: NewMemoryCache(maxSize, eviction),
|
|
workerCh: make(chan asyncCacheOp, 1000),
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
|
|
// Start worker goroutines
|
|
for i := 0; i < numWorkers; i++ {
|
|
go cache.worker()
|
|
}
|
|
|
|
return cache
|
|
}
|
|
|
|
// worker processes asynchronous cache operations.
|
|
func (c *AsyncMemoryCache) worker() {
|
|
for {
|
|
select {
|
|
case op := <-c.workerCh:
|
|
c.processOp(op)
|
|
case <-c.stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// processOp processes a single cache operation.
|
|
func (c *AsyncMemoryCache) processOp(op asyncCacheOp) {
|
|
switch op.opType {
|
|
case "get":
|
|
value, found := c.MemoryCache.Get(op.ctx, op.key)
|
|
if op.result != nil {
|
|
op.result <- CacheResult{Value: value, Found: found}
|
|
}
|
|
case "set":
|
|
c.MemoryCache.Set(op.ctx, op.key, op.value, op.ttl)
|
|
if op.done != nil {
|
|
op.done <- nil
|
|
}
|
|
case "delete":
|
|
c.MemoryCache.Delete(op.ctx, op.key)
|
|
if op.done != nil {
|
|
op.done <- nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// AGet asynchronously retrieves a value from the cache.
|
|
func (c *AsyncMemoryCache) AGet(ctx context.Context, key string) <-chan CacheResult {
|
|
resultCh := make(chan CacheResult, 1)
|
|
|
|
select {
|
|
case c.workerCh <- asyncCacheOp{
|
|
ctx: ctx,
|
|
opType: "get",
|
|
key: key,
|
|
result: resultCh,
|
|
}:
|
|
case <-ctx.Done():
|
|
resultCh <- CacheResult{Error: ctx.Err()}
|
|
close(resultCh)
|
|
}
|
|
|
|
return resultCh
|
|
}
|
|
|
|
// ASet asynchronously stores a value in the cache.
|
|
func (c *AsyncMemoryCache) ASet(ctx context.Context, key string, value any, ttl time.Duration) <-chan error {
|
|
doneCh := make(chan error, 1)
|
|
|
|
select {
|
|
case c.workerCh <- asyncCacheOp{
|
|
ctx: ctx,
|
|
opType: "set",
|
|
key: key,
|
|
value: value,
|
|
ttl: ttl,
|
|
done: doneCh,
|
|
}:
|
|
case <-ctx.Done():
|
|
doneCh <- ctx.Err()
|
|
close(doneCh)
|
|
}
|
|
|
|
return doneCh
|
|
}
|
|
|
|
// ADelete asynchronously removes a value from the cache.
|
|
func (c *AsyncMemoryCache) ADelete(ctx context.Context, key string) <-chan error {
|
|
doneCh := make(chan error, 1)
|
|
|
|
select {
|
|
case c.workerCh <- asyncCacheOp{
|
|
ctx: ctx,
|
|
opType: "delete",
|
|
key: key,
|
|
done: doneCh,
|
|
}:
|
|
case <-ctx.Done():
|
|
doneCh <- ctx.Err()
|
|
close(doneCh)
|
|
}
|
|
|
|
return doneCh
|
|
}
|
|
|
|
// Stop stops the async cache workers.
|
|
func (c *AsyncMemoryCache) Stop() {
|
|
close(c.stopCh)
|
|
}
|
|
|
|
// AsyncCachePolicy configures async cache behavior.
|
|
type AsyncCachePolicy struct {
|
|
// KeyFunc generates the cache key.
|
|
KeyFunc func(context.Context, any) string
|
|
|
|
// TTL is the time-to-live for cached values.
|
|
TTL *time.Duration
|
|
|
|
// Async determines if operations should be async.
|
|
Async bool
|
|
}
|
|
|
|
// AsyncCachedExecutor wraps a function with async caching.
|
|
type AsyncCachedExecutor struct {
|
|
cache AsyncCache
|
|
cachePolicy *AsyncCachePolicy
|
|
}
|
|
|
|
// NewAsyncCachedExecutor creates a new async cached executor.
|
|
func NewAsyncCachedExecutor(cache AsyncCache, policy *AsyncCachePolicy) *AsyncCachedExecutor {
|
|
return &AsyncCachedExecutor{
|
|
cache: cache,
|
|
cachePolicy: policy,
|
|
}
|
|
}
|
|
|
|
// Execute executes a function with async caching.
|
|
func (e *AsyncCachedExecutor) Execute(
|
|
ctx context.Context,
|
|
nodeName string,
|
|
input any,
|
|
fn func(context.Context, any) (any, error),
|
|
) (any, error) {
|
|
// Generate cache key
|
|
var key string
|
|
if e.cachePolicy != nil && e.cachePolicy.KeyFunc != nil {
|
|
key = e.cachePolicy.KeyFunc(ctx, input)
|
|
} else {
|
|
key = GenerateCacheKey(nodeName, input)
|
|
}
|
|
|
|
// Check cache asynchronously
|
|
if e.cachePolicy != nil && e.cachePolicy.Async {
|
|
resultCh := e.cache.AGet(ctx, key)
|
|
|
|
select {
|
|
case result := <-resultCh:
|
|
if result.Error != nil {
|
|
return nil, result.Error
|
|
}
|
|
if result.Found {
|
|
return result.Value, nil
|
|
}
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
} else {
|
|
// Synchronous fallback
|
|
if cached, ok := e.cache.Get(ctx, key); ok {
|
|
return cached, nil
|
|
}
|
|
}
|
|
|
|
// Execute function
|
|
result, err := fn(ctx, input)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Cache result asynchronously
|
|
var ttl time.Duration
|
|
if e.cachePolicy != nil && e.cachePolicy.TTL != nil {
|
|
ttl = *e.cachePolicy.TTL
|
|
}
|
|
|
|
if e.cachePolicy != nil && e.cachePolicy.Async {
|
|
// Fire and forget async set
|
|
e.cache.ASet(context.Background(), key, result, ttl)
|
|
} else {
|
|
e.cache.Set(ctx, key, result, ttl)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// WaitForPending waits for all pending async operations to complete.
|
|
func (e *AsyncCachedExecutor) WaitForPending(timeout time.Duration) bool {
|
|
// In a real implementation, this would track pending operations
|
|
// For now, just sleep briefly to allow operations to complete
|
|
time.Sleep(timeout)
|
|
return true
|
|
}
|