From dbc71bb57b764e2562831949d57baeb7957826bd Mon Sep 17 00:00:00 2001 From: shenbaise9527 Date: Sat, 27 Jun 2026 18:57:26 +0800 Subject: [PATCH] fix(redis): circuit breaking under high concurrency (#5640) (#5654) --- core/stores/redis/breakerhook.go | 1 + core/stores/redis/breakerhook_test.go | 40 ++++++++++ core/stores/redis/conf.go | 24 ++++++ core/stores/redis/redis.go | 61 +++++++++++++-- core/stores/redis/redis_test.go | 77 +++++++++++++++++++ core/stores/redis/redisblockingnode.go | 36 +++++---- core/stores/redis/redisblockingnode_test.go | 28 +++++++ core/stores/redis/redisclientmanager.go | 17 ++-- core/stores/redis/redisclustermanager.go | 15 ++-- core/stores/redis/redisclustermanager_test.go | 48 ++++++++++++ 10 files changed, 312 insertions(+), 35 deletions(-) diff --git a/core/stores/redis/breakerhook.go b/core/stores/redis/breakerhook.go index c8bdce7da..c77670012 100644 --- a/core/stores/redis/breakerhook.go +++ b/core/stores/redis/breakerhook.go @@ -10,6 +10,7 @@ import ( var ignoreCmds = map[string]lang.PlaceholderType{ "blpop": {}, + "hello": {}, } type breakerHook struct { diff --git a/core/stores/redis/breakerhook_test.go b/core/stores/redis/breakerhook_test.go index ebd415049..25bd7f375 100644 --- a/core/stores/redis/breakerhook_test.go +++ b/core/stores/redis/breakerhook_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/alicebob/miniredis/v2" + red "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" "github.com/zeromicro/go-zero/core/breaker" ) @@ -75,6 +76,45 @@ func TestBreakerHook_ProcessHook(t *testing.T) { } assert.Equal(t, someError.Error(), err.Error()) }) + + t.Run("breakerHook_ignoreHello", func(t *testing.T) { + // hello is issued on connection init and is in ignoreCmds, so repeated + // failures must never trip the breaker into ErrServiceUnavailable. + h := breakerHook{brk: breaker.NewBreaker()} + someError := errors.New("ERR some error") + process := h.ProcessHook(func(_ context.Context, _ red.Cmder) error { + return someError + }) + + ctx := context.Background() + var err error + for i := 0; i < 1000; i++ { + err = process(ctx, red.NewCmd(ctx, "hello", 3)) + if err != nil && err.Error() != someError.Error() { + break + } + } + assert.Equal(t, someError.Error(), err.Error()) + }) + + t.Run("breakerHook_notIgnored", func(t *testing.T) { + // a regular command is not ignored, so repeated failures open the breaker. + h := breakerHook{brk: breaker.NewBreaker()} + someError := errors.New("ERR some error") + process := h.ProcessHook(func(_ context.Context, _ red.Cmder) error { + return someError + }) + + ctx := context.Background() + var err error + for i := 0; i < 1000; i++ { + err = process(ctx, red.NewCmd(ctx, "get", "key")) + if err != nil && err.Error() != someError.Error() { + break + } + } + assert.Equal(t, breaker.ErrServiceUnavailable, err) + }) } func TestBreakerHook_ProcessPipelineHook(t *testing.T) { diff --git a/core/stores/redis/conf.go b/core/stores/redis/conf.go index da205bc00..0db4256ba 100644 --- a/core/stores/redis/conf.go +++ b/core/stores/redis/conf.go @@ -23,6 +23,30 @@ type ( Pass string `json:",optional"` Tls bool `json:",optional"` NonBlock bool `json:",default=true"` + // DisableIdentity is used to disable CLIENT SETINFO command on connect. + // + // Some redis versions/proxies do not support CLIENT SETINFO and return an + // error on connect; since that command runs through the breaker hook it can + // trip the breaker. Set this to true to skip it on such servers. Together + // with the default MaintNotifications=disabled (and the always-ignored + // HELLO command), this keeps the connect-time commands from tripping the + // breaker on incompatible servers, without forcing RESP2. + // + // default: false + DisableIdentity bool `json:",default=false"` + // Protocol 2 or 3. Use the version to negotiate RESP version with redis-server. + // + // default: 3. + Protocol int `json:",default=3"` + // MaintNotifications controls the CLIENT MAINT_NOTIFICATIONS handshake mode + // (go-redis MaintNotificationsConfig.Mode): + // - disabled: never send the command (avoids tripping the breaker on servers + // that don't support it; keeps RESP3 intact) + // - auto: try, silently fall back on error (go-redis default) + // - enabled: force, fail the connection on error + // + // default: disabled + MaintNotifications string `json:",default=disabled,options=disabled|enabled|auto"` // PingTimeout is the timeout for ping redis. PingTimeout time.Duration `json:",default=1s"` } diff --git a/core/stores/redis/redis.go b/core/stores/redis/redis.go index 4f1611651..4a91eace1 100644 --- a/core/stores/redis/redis.go +++ b/core/stores/redis/redis.go @@ -8,6 +8,7 @@ import ( "time" red "github.com/redis/go-redis/v9" + "github.com/redis/go-redis/v9/maintnotifications" "github.com/zeromicro/go-zero/core/breaker" "github.com/zeromicro/go-zero/core/errorx" "github.com/zeromicro/go-zero/core/logx" @@ -53,13 +54,16 @@ type ( // Redis defines a redis node/cluster. It is thread-safe. Redis struct { - Addr string - Type string - User string - Pass string - tls bool - brk breaker.Breaker - hooks []red.Hook + Addr string + Type string + User string + Pass string + protocol int + identity bool + maintNotifications maintnotifications.Mode + tls bool + brk breaker.Breaker + hooks []red.Hook } // RedisNode interface represents a redis node. @@ -136,6 +140,15 @@ func NewRedis(conf RedisConf, opts ...Option) (*Redis, error) { if conf.Tls { opts = append([]Option{WithTLS()}, opts...) } + if conf.Protocol > 0 { + opts = append([]Option{WithProtocol(conf.Protocol)}, opts...) + } + if conf.DisableIdentity { + opts = append([]Option{WithIdentity()}, opts...) + } + if len(conf.MaintNotifications) > 0 { + opts = append([]Option{WithMaintNotifications(conf.MaintNotifications)}, opts...) + } rds := newRedis(conf.Host, opts...) if !conf.NonBlock { @@ -2726,6 +2739,40 @@ func WithUser(user string) Option { } } +// WithProtocol customizes the given Redis with protocol. +func WithProtocol(protocol int) Option { + return func(r *Redis) { + r.protocol = protocol + } +} + +// WithIdentity customizes the given Redis with Identity enabled. +func WithIdentity() Option { + return func(r *Redis) { + r.identity = true + } +} + +// WithMaintNotifications customizes the given Redis with the maintenance +// notifications mode (disabled, enabled or auto). +func WithMaintNotifications(mode string) Option { + return func(r *Redis) { + r.maintNotifications = maintnotifications.Mode(mode) + } +} + +// maintNotificationsConfig builds the go-redis maintenance notifications config +// from the configured mode, defaulting to disabled when unset so that the +// CLIENT MAINT_NOTIFICATIONS command is not issued on connect. +func (r *Redis) maintNotificationsConfig() *maintnotifications.Config { + mode := r.maintNotifications + if mode == "" { + mode = maintnotifications.ModeDisabled + } + + return &maintnotifications.Config{Mode: mode} +} + func acceptable(err error) bool { return err == nil || errorx.In(err, red.Nil, context.Canceled) } diff --git a/core/stores/redis/redis_test.go b/core/stores/redis/redis_test.go index ee00df59e..f2eecf2ee 100644 --- a/core/stores/redis/redis_test.go +++ b/core/stores/redis/redis_test.go @@ -11,6 +11,7 @@ import ( "github.com/alicebob/miniredis/v2" red "github.com/redis/go-redis/v9" + "github.com/redis/go-redis/v9/maintnotifications" "github.com/stretchr/testify/assert" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stringx" @@ -150,6 +151,82 @@ func TestNewRedis(t *testing.T) { } } +func TestGetClientWithProtocolAndIdentity(t *testing.T) { + r := miniredis.RunT(t) + defer r.Close() + c, err := getClient(&Redis{ + Addr: r.Addr(), + Type: NodeType, + protocol: 2, + identity: true, + }) + if assert.NoError(t, err) { + assert.NotNil(t, c) + assert.Equal(t, 2, c.Options().Protocol) + assert.True(t, c.Options().DisableIdentity) + } +} + +func TestNewRedis_ProtocolAndIdentity(t *testing.T) { + logx.Disable() + + s := miniredis.RunT(t) + rds, err := NewRedis(RedisConf{ + Host: s.Addr(), + Type: NodeType, + Protocol: 2, + DisableIdentity: true, + }) + if assert.NoError(t, err) { + assert.Equal(t, 2, rds.protocol) + assert.True(t, rds.identity) + } +} + +func TestGetClientWithMaintNotifications(t *testing.T) { + tests := []struct { + name string + mode maintnotifications.Mode + want maintnotifications.Mode + }{ + {name: "unset falls back to disabled", mode: "", want: maintnotifications.ModeDisabled}, + {name: "disabled", mode: maintnotifications.ModeDisabled, want: maintnotifications.ModeDisabled}, + {name: "enabled", mode: maintnotifications.ModeEnabled, want: maintnotifications.ModeEnabled}, + {name: "auto", mode: maintnotifications.ModeAuto, want: maintnotifications.ModeAuto}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + r := miniredis.RunT(t) + defer r.Close() + c, err := getClient(&Redis{ + Addr: r.Addr(), + Type: NodeType, + maintNotifications: test.mode, + }) + if assert.NoError(t, err) { + assert.NotNil(t, c) + assert.NotNil(t, c.Options().MaintNotificationsConfig) + assert.Equal(t, test.want, c.Options().MaintNotificationsConfig.Mode) + } + }) + } +} + +func TestNewRedis_MaintNotifications(t *testing.T) { + logx.Disable() + + s := miniredis.RunT(t) + rds, err := NewRedis(RedisConf{ + Host: s.Addr(), + Type: NodeType, + MaintNotifications: string(maintnotifications.ModeAuto), + }) + if assert.NoError(t, err) { + assert.Equal(t, maintnotifications.ModeAuto, rds.maintNotifications) + } +} + func TestRedis_NonBlock(t *testing.T) { logx.Disable() diff --git a/core/stores/redis/redisblockingnode.go b/core/stores/redis/redisblockingnode.go index b590234ac..871e42102 100644 --- a/core/stores/redis/redisblockingnode.go +++ b/core/stores/redis/redisblockingnode.go @@ -50,25 +50,31 @@ func CreateBlockingNode(r *Redis) (ClosableNode, error) { switch r.Type { case NodeType: client := red.NewClient(&red.Options{ - Addr: r.Addr, - Username: r.User, - Password: r.Pass, - DB: defaultDatabase, - MaxRetries: maxRetries, - PoolSize: 1, - MinIdleConns: 1, - ReadTimeout: timeout, + Addr: r.Addr, + Username: r.User, + Password: r.Pass, + DB: defaultDatabase, + MaxRetries: maxRetries, + PoolSize: 1, + MinIdleConns: 1, + ReadTimeout: timeout, + Protocol: r.protocol, + DisableIdentity: r.identity, + MaintNotificationsConfig: r.maintNotificationsConfig(), }) return &clientBridge{client}, nil case ClusterType: client := red.NewClusterClient(&red.ClusterOptions{ - Addrs: splitClusterAddrs(r.Addr), - Username: r.User, - Password: r.Pass, - MaxRetries: maxRetries, - PoolSize: 1, - MinIdleConns: 1, - ReadTimeout: timeout, + Addrs: splitClusterAddrs(r.Addr), + Username: r.User, + Password: r.Pass, + MaxRetries: maxRetries, + PoolSize: 1, + MinIdleConns: 1, + ReadTimeout: timeout, + Protocol: r.protocol, + DisableIdentity: r.identity, + MaintNotificationsConfig: r.maintNotificationsConfig(), }) return &clusterBridge{client}, nil default: diff --git a/core/stores/redis/redisblockingnode_test.go b/core/stores/redis/redisblockingnode_test.go index dc0d40837..e222b9414 100644 --- a/core/stores/redis/redisblockingnode_test.go +++ b/core/stores/redis/redisblockingnode_test.go @@ -43,4 +43,32 @@ func TestBlockingNode(t *testing.T) { _, err = CreateBlockingNode(New(r.Addr(), badType())) assert.Error(t, err) }) + + t.Run("test blocking node with protocol and identity", func(t *testing.T) { + r, err := miniredis.Run() + assert.NoError(t, err) + defer r.Close() + + node, err := CreateBlockingNode(New(r.Addr(), WithProtocol(2), WithIdentity())) + assert.NoError(t, err) + bridge, ok := node.(*clientBridge) + assert.True(t, ok) + assert.Equal(t, 2, bridge.Options().Protocol) + assert.True(t, bridge.Options().DisableIdentity) + node.Close() + }) + + t.Run("test blocking node with cluster, protocol and identity", func(t *testing.T) { + r, err := miniredis.Run() + assert.NoError(t, err) + defer r.Close() + + node, err := CreateBlockingNode(New(r.Addr(), Cluster(), WithProtocol(2), WithIdentity())) + assert.NoError(t, err) + bridge, ok := node.(*clusterBridge) + assert.True(t, ok) + assert.Equal(t, 2, bridge.Options().Protocol) + assert.True(t, bridge.Options().DisableIdentity) + node.Close() + }) } diff --git a/core/stores/redis/redisclientmanager.go b/core/stores/redis/redisclientmanager.go index 9be0065bd..88a12a701 100644 --- a/core/stores/redis/redisclientmanager.go +++ b/core/stores/redis/redisclientmanager.go @@ -30,13 +30,16 @@ func getClient(r *Redis) (*red.Client, error) { } } store := red.NewClient(&red.Options{ - Addr: r.Addr, - Username: r.User, - Password: r.Pass, - DB: defaultDatabase, - MaxRetries: maxRetries, - MinIdleConns: idleConns, - TLSConfig: tlsConfig, + Addr: r.Addr, + Username: r.User, + Password: r.Pass, + DB: defaultDatabase, + MaxRetries: maxRetries, + MinIdleConns: idleConns, + TLSConfig: tlsConfig, + Protocol: r.protocol, + DisableIdentity: r.identity, + MaintNotificationsConfig: r.maintNotificationsConfig(), }) hooks := append([]red.Hook{defaultDurationHook, breakerHook{ diff --git a/core/stores/redis/redisclustermanager.go b/core/stores/redis/redisclustermanager.go index 6cd9420ba..eb2b1378c 100644 --- a/core/stores/redis/redisclustermanager.go +++ b/core/stores/redis/redisclustermanager.go @@ -27,12 +27,15 @@ func getCluster(r *Redis) (*red.ClusterClient, error) { } } store := red.NewClusterClient(&red.ClusterOptions{ - Addrs: splitClusterAddrs(r.Addr), - Username: r.User, - Password: r.Pass, - MaxRetries: maxRetries, - MinIdleConns: idleConns, - TLSConfig: tlsConfig, + Addrs: splitClusterAddrs(r.Addr), + Username: r.User, + Password: r.Pass, + MaxRetries: maxRetries, + MinIdleConns: idleConns, + TLSConfig: tlsConfig, + Protocol: r.protocol, + DisableIdentity: r.identity, + MaintNotificationsConfig: r.maintNotificationsConfig(), }) hooks := append([]red.Hook{defaultDurationHook, breakerHook{ diff --git a/core/stores/redis/redisclustermanager_test.go b/core/stores/redis/redisclustermanager_test.go index ad4185583..6e3874814 100644 --- a/core/stores/redis/redisclustermanager_test.go +++ b/core/stores/redis/redisclustermanager_test.go @@ -5,6 +5,7 @@ import ( "github.com/alicebob/miniredis/v2" red "github.com/redis/go-redis/v9" + "github.com/redis/go-redis/v9/maintnotifications" "github.com/stretchr/testify/assert" ) @@ -57,3 +58,50 @@ func TestGetCluster(t *testing.T) { assert.NotNil(t, c) } } + +func TestGetClusterWithProtocolAndIdentity(t *testing.T) { + r := miniredis.RunT(t) + defer r.Close() + c, err := getCluster(&Redis{ + Addr: r.Addr(), + Type: ClusterType, + protocol: 2, + identity: true, + hooks: []red.Hook{defaultDurationHook}, + }) + if assert.NoError(t, err) { + assert.NotNil(t, c) + assert.Equal(t, 2, c.Options().Protocol) + assert.True(t, c.Options().DisableIdentity) + } +} + +func TestGetClusterWithMaintNotifications(t *testing.T) { + tests := []struct { + name string + mode maintnotifications.Mode + want maintnotifications.Mode + }{ + {name: "unset falls back to disabled", mode: "", want: maintnotifications.ModeDisabled}, + {name: "disabled", mode: maintnotifications.ModeDisabled, want: maintnotifications.ModeDisabled}, + {name: "auto", mode: maintnotifications.ModeAuto, want: maintnotifications.ModeAuto}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + r := miniredis.RunT(t) + defer r.Close() + c, err := getCluster(&Redis{ + Addr: r.Addr(), + Type: ClusterType, + maintNotifications: test.mode, + hooks: []red.Hook{defaultDurationHook}, + }) + if assert.NoError(t, err) { + assert.NotNil(t, c) + assert.NotNil(t, c.Options().MaintNotificationsConfig) + assert.Equal(t, test.want, c.Options().MaintNotificationsConfig.Mode) + } + }) + } +}