fix(redis): circuit breaking under high concurrency (#5640) (#5654)

This commit is contained in:
shenbaise9527
2026-06-27 18:57:26 +08:00
committed by GitHub
parent b2e3aa1587
commit dbc71bb57b
10 changed files with 312 additions and 35 deletions

View File

@@ -10,6 +10,7 @@ import (
var ignoreCmds = map[string]lang.PlaceholderType{ var ignoreCmds = map[string]lang.PlaceholderType{
"blpop": {}, "blpop": {},
"hello": {},
} }
type breakerHook struct { type breakerHook struct {

View File

@@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/alicebob/miniredis/v2" "github.com/alicebob/miniredis/v2"
red "github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/breaker" "github.com/zeromicro/go-zero/core/breaker"
) )
@@ -75,6 +76,45 @@ func TestBreakerHook_ProcessHook(t *testing.T) {
} }
assert.Equal(t, someError.Error(), err.Error()) assert.Equal(t, someError.Error(), err.Error())
}) })
t.Run("breakerHook_ignoreHello", func(t *testing.T) {
// hello is issued on connection init and is in ignoreCmds, so repeated
// failures must never trip the breaker into ErrServiceUnavailable.
h := breakerHook{brk: breaker.NewBreaker()}
someError := errors.New("ERR some error")
process := h.ProcessHook(func(_ context.Context, _ red.Cmder) error {
return someError
})
ctx := context.Background()
var err error
for i := 0; i < 1000; i++ {
err = process(ctx, red.NewCmd(ctx, "hello", 3))
if err != nil && err.Error() != someError.Error() {
break
}
}
assert.Equal(t, someError.Error(), err.Error())
})
t.Run("breakerHook_notIgnored", func(t *testing.T) {
// a regular command is not ignored, so repeated failures open the breaker.
h := breakerHook{brk: breaker.NewBreaker()}
someError := errors.New("ERR some error")
process := h.ProcessHook(func(_ context.Context, _ red.Cmder) error {
return someError
})
ctx := context.Background()
var err error
for i := 0; i < 1000; i++ {
err = process(ctx, red.NewCmd(ctx, "get", "key"))
if err != nil && err.Error() != someError.Error() {
break
}
}
assert.Equal(t, breaker.ErrServiceUnavailable, err)
})
} }
func TestBreakerHook_ProcessPipelineHook(t *testing.T) { func TestBreakerHook_ProcessPipelineHook(t *testing.T) {

View File

@@ -23,6 +23,30 @@ type (
Pass string `json:",optional"` Pass string `json:",optional"`
Tls bool `json:",optional"` Tls bool `json:",optional"`
NonBlock bool `json:",default=true"` NonBlock bool `json:",default=true"`
// DisableIdentity is used to disable CLIENT SETINFO command on connect.
//
// Some redis versions/proxies do not support CLIENT SETINFO and return an
// error on connect; since that command runs through the breaker hook it can
// trip the breaker. Set this to true to skip it on such servers. Together
// with the default MaintNotifications=disabled (and the always-ignored
// HELLO command), this keeps the connect-time commands from tripping the
// breaker on incompatible servers, without forcing RESP2.
//
// default: false
DisableIdentity bool `json:",default=false"`
// Protocol 2 or 3. Use the version to negotiate RESP version with redis-server.
//
// default: 3.
Protocol int `json:",default=3"`
// MaintNotifications controls the CLIENT MAINT_NOTIFICATIONS handshake mode
// (go-redis MaintNotificationsConfig.Mode):
// - disabled: never send the command (avoids tripping the breaker on servers
// that don't support it; keeps RESP3 intact)
// - auto: try, silently fall back on error (go-redis default)
// - enabled: force, fail the connection on error
//
// default: disabled
MaintNotifications string `json:",default=disabled,options=disabled|enabled|auto"`
// PingTimeout is the timeout for ping redis. // PingTimeout is the timeout for ping redis.
PingTimeout time.Duration `json:",default=1s"` PingTimeout time.Duration `json:",default=1s"`
} }

View File

@@ -8,6 +8,7 @@ import (
"time" "time"
red "github.com/redis/go-redis/v9" red "github.com/redis/go-redis/v9"
"github.com/redis/go-redis/v9/maintnotifications"
"github.com/zeromicro/go-zero/core/breaker" "github.com/zeromicro/go-zero/core/breaker"
"github.com/zeromicro/go-zero/core/errorx" "github.com/zeromicro/go-zero/core/errorx"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
@@ -57,6 +58,9 @@ type (
Type string Type string
User string User string
Pass string Pass string
protocol int
identity bool
maintNotifications maintnotifications.Mode
tls bool tls bool
brk breaker.Breaker brk breaker.Breaker
hooks []red.Hook hooks []red.Hook
@@ -136,6 +140,15 @@ func NewRedis(conf RedisConf, opts ...Option) (*Redis, error) {
if conf.Tls { if conf.Tls {
opts = append([]Option{WithTLS()}, opts...) opts = append([]Option{WithTLS()}, opts...)
} }
if conf.Protocol > 0 {
opts = append([]Option{WithProtocol(conf.Protocol)}, opts...)
}
if conf.DisableIdentity {
opts = append([]Option{WithIdentity()}, opts...)
}
if len(conf.MaintNotifications) > 0 {
opts = append([]Option{WithMaintNotifications(conf.MaintNotifications)}, opts...)
}
rds := newRedis(conf.Host, opts...) rds := newRedis(conf.Host, opts...)
if !conf.NonBlock { if !conf.NonBlock {
@@ -2726,6 +2739,40 @@ func WithUser(user string) Option {
} }
} }
// WithProtocol customizes the given Redis with protocol.
func WithProtocol(protocol int) Option {
return func(r *Redis) {
r.protocol = protocol
}
}
// WithIdentity customizes the given Redis with Identity enabled.
func WithIdentity() Option {
return func(r *Redis) {
r.identity = true
}
}
// WithMaintNotifications customizes the given Redis with the maintenance
// notifications mode (disabled, enabled or auto).
func WithMaintNotifications(mode string) Option {
return func(r *Redis) {
r.maintNotifications = maintnotifications.Mode(mode)
}
}
// maintNotificationsConfig builds the go-redis maintenance notifications config
// from the configured mode, defaulting to disabled when unset so that the
// CLIENT MAINT_NOTIFICATIONS command is not issued on connect.
func (r *Redis) maintNotificationsConfig() *maintnotifications.Config {
mode := r.maintNotifications
if mode == "" {
mode = maintnotifications.ModeDisabled
}
return &maintnotifications.Config{Mode: mode}
}
func acceptable(err error) bool { func acceptable(err error) bool {
return err == nil || errorx.In(err, red.Nil, context.Canceled) return err == nil || errorx.In(err, red.Nil, context.Canceled)
} }

View File

@@ -11,6 +11,7 @@ import (
"github.com/alicebob/miniredis/v2" "github.com/alicebob/miniredis/v2"
red "github.com/redis/go-redis/v9" red "github.com/redis/go-redis/v9"
"github.com/redis/go-redis/v9/maintnotifications"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
@@ -150,6 +151,82 @@ func TestNewRedis(t *testing.T) {
} }
} }
func TestGetClientWithProtocolAndIdentity(t *testing.T) {
r := miniredis.RunT(t)
defer r.Close()
c, err := getClient(&Redis{
Addr: r.Addr(),
Type: NodeType,
protocol: 2,
identity: true,
})
if assert.NoError(t, err) {
assert.NotNil(t, c)
assert.Equal(t, 2, c.Options().Protocol)
assert.True(t, c.Options().DisableIdentity)
}
}
func TestNewRedis_ProtocolAndIdentity(t *testing.T) {
logx.Disable()
s := miniredis.RunT(t)
rds, err := NewRedis(RedisConf{
Host: s.Addr(),
Type: NodeType,
Protocol: 2,
DisableIdentity: true,
})
if assert.NoError(t, err) {
assert.Equal(t, 2, rds.protocol)
assert.True(t, rds.identity)
}
}
func TestGetClientWithMaintNotifications(t *testing.T) {
tests := []struct {
name string
mode maintnotifications.Mode
want maintnotifications.Mode
}{
{name: "unset falls back to disabled", mode: "", want: maintnotifications.ModeDisabled},
{name: "disabled", mode: maintnotifications.ModeDisabled, want: maintnotifications.ModeDisabled},
{name: "enabled", mode: maintnotifications.ModeEnabled, want: maintnotifications.ModeEnabled},
{name: "auto", mode: maintnotifications.ModeAuto, want: maintnotifications.ModeAuto},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r := miniredis.RunT(t)
defer r.Close()
c, err := getClient(&Redis{
Addr: r.Addr(),
Type: NodeType,
maintNotifications: test.mode,
})
if assert.NoError(t, err) {
assert.NotNil(t, c)
assert.NotNil(t, c.Options().MaintNotificationsConfig)
assert.Equal(t, test.want, c.Options().MaintNotificationsConfig.Mode)
}
})
}
}
func TestNewRedis_MaintNotifications(t *testing.T) {
logx.Disable()
s := miniredis.RunT(t)
rds, err := NewRedis(RedisConf{
Host: s.Addr(),
Type: NodeType,
MaintNotifications: string(maintnotifications.ModeAuto),
})
if assert.NoError(t, err) {
assert.Equal(t, maintnotifications.ModeAuto, rds.maintNotifications)
}
}
func TestRedis_NonBlock(t *testing.T) { func TestRedis_NonBlock(t *testing.T) {
logx.Disable() logx.Disable()

View File

@@ -58,6 +58,9 @@ func CreateBlockingNode(r *Redis) (ClosableNode, error) {
PoolSize: 1, PoolSize: 1,
MinIdleConns: 1, MinIdleConns: 1,
ReadTimeout: timeout, ReadTimeout: timeout,
Protocol: r.protocol,
DisableIdentity: r.identity,
MaintNotificationsConfig: r.maintNotificationsConfig(),
}) })
return &clientBridge{client}, nil return &clientBridge{client}, nil
case ClusterType: case ClusterType:
@@ -69,6 +72,9 @@ func CreateBlockingNode(r *Redis) (ClosableNode, error) {
PoolSize: 1, PoolSize: 1,
MinIdleConns: 1, MinIdleConns: 1,
ReadTimeout: timeout, ReadTimeout: timeout,
Protocol: r.protocol,
DisableIdentity: r.identity,
MaintNotificationsConfig: r.maintNotificationsConfig(),
}) })
return &clusterBridge{client}, nil return &clusterBridge{client}, nil
default: default:

View File

@@ -43,4 +43,32 @@ func TestBlockingNode(t *testing.T) {
_, err = CreateBlockingNode(New(r.Addr(), badType())) _, err = CreateBlockingNode(New(r.Addr(), badType()))
assert.Error(t, err) assert.Error(t, err)
}) })
t.Run("test blocking node with protocol and identity", func(t *testing.T) {
r, err := miniredis.Run()
assert.NoError(t, err)
defer r.Close()
node, err := CreateBlockingNode(New(r.Addr(), WithProtocol(2), WithIdentity()))
assert.NoError(t, err)
bridge, ok := node.(*clientBridge)
assert.True(t, ok)
assert.Equal(t, 2, bridge.Options().Protocol)
assert.True(t, bridge.Options().DisableIdentity)
node.Close()
})
t.Run("test blocking node with cluster, protocol and identity", func(t *testing.T) {
r, err := miniredis.Run()
assert.NoError(t, err)
defer r.Close()
node, err := CreateBlockingNode(New(r.Addr(), Cluster(), WithProtocol(2), WithIdentity()))
assert.NoError(t, err)
bridge, ok := node.(*clusterBridge)
assert.True(t, ok)
assert.Equal(t, 2, bridge.Options().Protocol)
assert.True(t, bridge.Options().DisableIdentity)
node.Close()
})
} }

View File

@@ -37,6 +37,9 @@ func getClient(r *Redis) (*red.Client, error) {
MaxRetries: maxRetries, MaxRetries: maxRetries,
MinIdleConns: idleConns, MinIdleConns: idleConns,
TLSConfig: tlsConfig, TLSConfig: tlsConfig,
Protocol: r.protocol,
DisableIdentity: r.identity,
MaintNotificationsConfig: r.maintNotificationsConfig(),
}) })
hooks := append([]red.Hook{defaultDurationHook, breakerHook{ hooks := append([]red.Hook{defaultDurationHook, breakerHook{

View File

@@ -33,6 +33,9 @@ func getCluster(r *Redis) (*red.ClusterClient, error) {
MaxRetries: maxRetries, MaxRetries: maxRetries,
MinIdleConns: idleConns, MinIdleConns: idleConns,
TLSConfig: tlsConfig, TLSConfig: tlsConfig,
Protocol: r.protocol,
DisableIdentity: r.identity,
MaintNotificationsConfig: r.maintNotificationsConfig(),
}) })
hooks := append([]red.Hook{defaultDurationHook, breakerHook{ hooks := append([]red.Hook{defaultDurationHook, breakerHook{

View File

@@ -5,6 +5,7 @@ import (
"github.com/alicebob/miniredis/v2" "github.com/alicebob/miniredis/v2"
red "github.com/redis/go-redis/v9" red "github.com/redis/go-redis/v9"
"github.com/redis/go-redis/v9/maintnotifications"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@@ -57,3 +58,50 @@ func TestGetCluster(t *testing.T) {
assert.NotNil(t, c) assert.NotNil(t, c)
} }
} }
func TestGetClusterWithProtocolAndIdentity(t *testing.T) {
r := miniredis.RunT(t)
defer r.Close()
c, err := getCluster(&Redis{
Addr: r.Addr(),
Type: ClusterType,
protocol: 2,
identity: true,
hooks: []red.Hook{defaultDurationHook},
})
if assert.NoError(t, err) {
assert.NotNil(t, c)
assert.Equal(t, 2, c.Options().Protocol)
assert.True(t, c.Options().DisableIdentity)
}
}
func TestGetClusterWithMaintNotifications(t *testing.T) {
tests := []struct {
name string
mode maintnotifications.Mode
want maintnotifications.Mode
}{
{name: "unset falls back to disabled", mode: "", want: maintnotifications.ModeDisabled},
{name: "disabled", mode: maintnotifications.ModeDisabled, want: maintnotifications.ModeDisabled},
{name: "auto", mode: maintnotifications.ModeAuto, want: maintnotifications.ModeAuto},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r := miniredis.RunT(t)
defer r.Close()
c, err := getCluster(&Redis{
Addr: r.Addr(),
Type: ClusterType,
maintNotifications: test.mode,
hooks: []red.Hook{defaultDurationHook},
})
if assert.NoError(t, err) {
assert.NotNil(t, c)
assert.NotNil(t, c.Options().MaintNotificationsConfig)
assert.Equal(t, test.want, c.Options().MaintNotificationsConfig.Mode)
}
})
}
}