mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-07-05 02:55:48 +08:00
### What problem does this PR solve? - Tools management - Pregel engine wrapper for better usage - UT race - Coding style ### Type of change - [x] Refactoring
233 lines
5.2 KiB
Go
233 lines
5.2 KiB
Go
// Package pregel provides caching support for Pregel execution.
|
|
package pregel
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"ragflow/internal/harness/graph/types"
|
|
)
|
|
|
|
// Cache is the interface for caching node outputs.
|
|
type Cache interface {
|
|
// Get retrieves a value from the cache.
|
|
Get(ctx context.Context, key string) (any, bool)
|
|
// Set stores a value in the cache.
|
|
Set(ctx context.Context, key string, value any, ttl time.Duration)
|
|
// Delete removes a value from the cache.
|
|
Delete(ctx context.Context, key string)
|
|
// Clear clears all values from the cache.
|
|
Clear()
|
|
}
|
|
|
|
// MemoryCache is an in-memory cache implementation.
|
|
type MemoryCache struct {
|
|
mu sync.RWMutex
|
|
data map[string]*cacheEntry
|
|
maxSize int
|
|
eviction EvictionPolicy
|
|
}
|
|
|
|
type cacheEntry struct {
|
|
value any
|
|
expiration time.Time
|
|
lastAccess time.Time
|
|
hits int64
|
|
}
|
|
|
|
// EvictionPolicy determines how entries are evicted when cache is full.
|
|
type EvictionPolicy int
|
|
|
|
const (
|
|
// EvictLRU evicts least recently used entries.
|
|
EvictLRU EvictionPolicy = iota
|
|
// EvictLFU evicts least frequently used entries.
|
|
EvictLFU
|
|
// EvictRandom evicts random entries.
|
|
EvictRandom
|
|
)
|
|
|
|
// NewMemoryCache creates a new in-memory cache.
|
|
func NewMemoryCache(maxSize int, eviction EvictionPolicy) *MemoryCache {
|
|
return &MemoryCache{
|
|
data: make(map[string]*cacheEntry),
|
|
maxSize: maxSize,
|
|
eviction: eviction,
|
|
}
|
|
}
|
|
|
|
// Get retrieves a value from the cache.
|
|
func (c *MemoryCache) Get(ctx context.Context, key string) (any, bool) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
entry, ok := c.data[key]
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
|
|
// Check expiration
|
|
if !entry.expiration.IsZero() && time.Now().After(entry.expiration) {
|
|
return nil, false
|
|
}
|
|
|
|
entry.hits++
|
|
entry.lastAccess = time.Now()
|
|
return entry.value, true
|
|
}
|
|
|
|
// Set stores a value in the cache.
|
|
func (c *MemoryCache) Set(ctx context.Context, key string, value any, ttl time.Duration) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// Evict entries if cache is full
|
|
if len(c.data) >= c.maxSize {
|
|
c.evict()
|
|
}
|
|
|
|
expiration := time.Time{}
|
|
if ttl > 0 {
|
|
expiration = time.Now().Add(ttl)
|
|
}
|
|
|
|
c.data[key] = &cacheEntry{
|
|
value: value,
|
|
expiration: expiration,
|
|
lastAccess: time.Now(),
|
|
hits: 0,
|
|
}
|
|
}
|
|
|
|
// Delete removes a value from the cache.
|
|
func (c *MemoryCache) Delete(ctx context.Context, key string) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
delete(c.data, key)
|
|
}
|
|
|
|
// Clear clears all values from the cache.
|
|
func (c *MemoryCache) Clear() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.data = make(map[string]*cacheEntry)
|
|
}
|
|
|
|
// evict removes an entry based on the eviction policy.
|
|
func (c *MemoryCache) evict() {
|
|
if len(c.data) == 0 {
|
|
return
|
|
}
|
|
|
|
var keyToDelete string
|
|
|
|
switch c.eviction {
|
|
case EvictLRU:
|
|
// Find least recently used entry
|
|
var oldest time.Time
|
|
for k, v := range c.data {
|
|
if oldest.IsZero() || v.lastAccess.Before(oldest) {
|
|
oldest = v.lastAccess
|
|
keyToDelete = k
|
|
}
|
|
}
|
|
case EvictLFU:
|
|
// Find least frequently used
|
|
var minHits int64 = -1
|
|
for k, v := range c.data {
|
|
if minHits == -1 || v.hits < minHits {
|
|
minHits = v.hits
|
|
keyToDelete = k
|
|
}
|
|
}
|
|
case EvictRandom:
|
|
// Delete first entry (Go map iteration is randomized)
|
|
for k := range c.data {
|
|
keyToDelete = k
|
|
break
|
|
}
|
|
}
|
|
|
|
if keyToDelete != "" {
|
|
delete(c.data, keyToDelete)
|
|
}
|
|
}
|
|
|
|
// GenerateCacheKey generates a cache key from the given input.
|
|
func GenerateCacheKey(nodeName string, input any) string {
|
|
data, err := json.Marshal(input)
|
|
if err != nil {
|
|
// Fall back to the type name so different inputs still produce different keys.
|
|
return fmt.Sprintf("%s:%T", nodeName, input)
|
|
}
|
|
hash := sha256.Sum256(data)
|
|
return fmt.Sprintf("%s:%s", nodeName, hex.EncodeToString(hash[:]))
|
|
}
|
|
|
|
// CachedExecutor wraps a function with caching.
|
|
type CachedExecutor struct {
|
|
cache Cache
|
|
cachePolicy *types.CachePolicy
|
|
}
|
|
|
|
// NewCachedExecutor creates a new cached executor.
|
|
func NewCachedExecutor(cache Cache, policy *types.CachePolicy) *CachedExecutor {
|
|
return &CachedExecutor{
|
|
cache: cache,
|
|
cachePolicy: policy,
|
|
}
|
|
}
|
|
|
|
// Execute executes a function with caching.
|
|
func (e *CachedExecutor) Execute(ctx context.Context, nodeName string, input any, fn func(context.Context, any) (any, error)) (any, error) {
|
|
// Generate cache key
|
|
var key string
|
|
if e.cachePolicy != nil && e.cachePolicy.KeyFunc != nil {
|
|
key = e.cachePolicy.KeyFunc(ctx, input)
|
|
} else {
|
|
key = GenerateCacheKey(nodeName, input)
|
|
}
|
|
|
|
// Check cache
|
|
if cached, ok := e.cache.Get(ctx, key); ok {
|
|
return cached, nil
|
|
}
|
|
|
|
// Execute function
|
|
result, err := fn(ctx, input)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Cache result
|
|
var ttl time.Duration
|
|
if e.cachePolicy != nil && e.cachePolicy.TTL != nil {
|
|
ttl = *e.cachePolicy.TTL
|
|
}
|
|
e.cache.Set(ctx, key, result, ttl)
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// NoopCache is a cache that doesn't store anything.
|
|
type NoopCache struct{}
|
|
|
|
// Get always returns false.
|
|
func (n *NoopCache) Get(ctx context.Context, key string) (any, bool) {
|
|
return nil, false
|
|
}
|
|
|
|
// Set is a no-op.
|
|
func (n *NoopCache) Set(ctx context.Context, key string, value any, ttl time.Duration) {}
|
|
|
|
// Delete is a no-op.
|
|
func (n *NoopCache) Delete(ctx context.Context, key string) {}
|
|
|
|
// Clear is a no-op.
|
|
func (n *NoopCache) Clear() {}
|