mirror of
https://github.com/zeromicro/go-zero.git
synced 2026-06-21 01:11:57 +08:00
feat: add XGroupSetID method to Redis API (#5637)
Co-authored-by: kevin <wanjunfeng@gmail.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -1868,6 +1868,21 @@ func (s *Redis) XGroupCreateCtx(ctx context.Context, stream string, group string
|
|||||||
return conn.XGroupCreate(ctx, stream, group, start).Result()
|
return conn.XGroupCreate(ctx, stream, group, start).Result()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// XGroupSetID sets the last delivered ID for a Redis stream consumer group.
|
||||||
|
func (s *Redis) XGroupSetID(stream, group, start string) (string, error) {
|
||||||
|
return s.XGroupSetIDCtx(context.Background(), stream, group, start)
|
||||||
|
}
|
||||||
|
|
||||||
|
// XGroupSetIDCtx is the context-aware version of XGroupSetID.
|
||||||
|
func (s *Redis) XGroupSetIDCtx(ctx context.Context, stream, group, start string) (string, error) {
|
||||||
|
conn, err := getRedis(s)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return conn.XGroupSetID(ctx, stream, group, start).Result()
|
||||||
|
}
|
||||||
|
|
||||||
// XInfoConsumers returns information about consumers in a Redis stream consumer group.
|
// XInfoConsumers returns information about consumers in a Redis stream consumer group.
|
||||||
func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) {
|
func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) {
|
||||||
return s.XInfoConsumersCtx(context.Background(), stream, group)
|
return s.XInfoConsumersCtx(context.Background(), stream, group)
|
||||||
|
|||||||
@@ -2294,6 +2294,31 @@ func TestRedisXGroupCreate(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRedisXGroupSetID(t *testing.T) {
|
||||||
|
runOnRedis(t, func(client *Redis) {
|
||||||
|
_, err := newRedis(client.Addr, badType()).XGroupSetID("Source", "Destination", "0")
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
|
||||||
|
redisCli := newRedis(client.Addr)
|
||||||
|
stream := "aa"
|
||||||
|
group := "bb"
|
||||||
|
|
||||||
|
_, err = redisCli.XGroupCreateMkStream(stream, group, "0")
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
res, err := redisCli.XGroupSetID(stream, group, "0")
|
||||||
|
assert.Empty(t, res)
|
||||||
|
assert.ErrorContains(t, err, "not supported")
|
||||||
|
|
||||||
|
_, err = newRedis(client.Addr, badType()).XGroupSetIDCtx(context.Background(), stream, group, "0")
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
|
||||||
|
res, err = redisCli.XGroupSetIDCtx(context.Background(), stream, group, "0")
|
||||||
|
assert.Empty(t, res)
|
||||||
|
assert.ErrorContains(t, err, "not supported")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestRedisXInfo(t *testing.T) {
|
func TestRedisXInfo(t *testing.T) {
|
||||||
runOnRedis(t, func(client *Redis) {
|
runOnRedis(t, func(client *Redis) {
|
||||||
_, err := newRedis(client.Addr, badType()).XInfoStream("Source")
|
_, err := newRedis(client.Addr, badType()).XInfoStream("Source")
|
||||||
|
|||||||
Reference in New Issue
Block a user