From 7b5e7b1c26c8b668fc0dadb180174f4790592a6d Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sat, 16 May 2026 12:35:05 +0800 Subject: [PATCH] fix(discov): prevent unbounded memory growth on duplicate etcd PUT events (#5580) --- core/discov/internal/registry.go | 22 +++++--- core/discov/internal/registry_test.go | 46 +++++++++++++++- core/discov/subscriber.go | 13 ++++- core/discov/subscriber_test.go | 75 +++++++++++++++++++++++++++ 4 files changed, 148 insertions(+), 8 deletions(-) diff --git a/core/discov/internal/registry.go b/core/discov/internal/registry.go index c4ab41170..e4ad0e963 100644 --- a/core/discov/internal/registry.go +++ b/core/discov/internal/registry.go @@ -263,14 +263,24 @@ func (c *cluster) handleWatchEvents(ctx context.Context, key watchKey, events [] for _, ev := range events { switch ev.Type { case clientv3.EventTypePut: + evKey := string(ev.Kv.Key) + evVal := string(ev.Kv.Value) c.lock.Lock() - watcher.values[string(ev.Kv.Key)] = string(ev.Kv.Value) + oldVal, exists := watcher.values[evKey] + watcher.values[evKey] = evVal c.lock.Unlock() + if exists && oldVal == evVal { + // duplicate PUT with same value, skip to prevent unbounded growth + continue + } + if exists { + // key moved to a new value, notify delete of old entry first + for _, l := range listeners { + l.OnDelete(KV{Key: evKey, Val: oldVal}) + } + } for _, l := range listeners { - l.OnAdd(KV{ - Key: string(ev.Kv.Key), - Val: string(ev.Kv.Value), - }) + l.OnAdd(KV{Key: evKey, Val: evVal}) } case clientv3.EventTypeDelete: c.lock.Lock() @@ -433,7 +443,7 @@ func (c *cluster) setupWatch(cli EtcdClient, key watchKey, rev int64) (context.C } ctx, cancel := context.WithCancel(cli.Ctx()) - + c.lock.Lock() if watcher, ok := c.watchers[key]; ok { watcher.cancel = cancel diff --git a/core/discov/internal/registry_test.go b/core/discov/internal/registry_test.go index 21f7e079e..0d5f6c806 100644 --- a/core/discov/internal/registry_test.go +++ b/core/discov/internal/registry_test.go @@ -517,7 +517,7 @@ func TestCluster_ConcurrentMonitor(t *testing.T) { go func() { defer wg.Done() key := keys[idx%len(keys)] - + if idx%2 == 0 { // Half the goroutines add listeners (write operation) c.addListener(key, &mockListener{}) @@ -543,6 +543,50 @@ func TestCluster_ConcurrentMonitor(t *testing.T) { close(c.done) } +func TestCluster_handleWatchEvents_DuplicatePut(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + listener := NewMockUpdateListener(ctrl) + // OnAdd must be called exactly once despite two PUT events with the same key+value. + listener.EXPECT().OnAdd(KV{Key: "hello", Val: "world"}).Times(1) + + c := newCluster([]string{"any"}) + key := watchKey{key: "any"} + c.watchers[key] = &watchValue{ + listeners: []UpdateListener{listener}, + values: make(map[string]string), + } + events := []*clientv3.Event{ + {Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key: []byte("hello"), Value: []byte("world")}}, + {Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key: []byte("hello"), Value: []byte("world")}}, + } + c.handleWatchEvents(context.Background(), key, events) +} + +func TestCluster_handleWatchEvents_ValueChange(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + listener := NewMockUpdateListener(ctrl) + gomock.InOrder( + listener.EXPECT().OnAdd(KV{Key: "hello", Val: "world1"}), + listener.EXPECT().OnDelete(KV{Key: "hello", Val: "world1"}), + listener.EXPECT().OnAdd(KV{Key: "hello", Val: "world2"}), + ) + + c := newCluster([]string{"any"}) + key := watchKey{key: "any"} + c.watchers[key] = &watchValue{ + listeners: []UpdateListener{listener}, + values: make(map[string]string), + } + c.handleWatchEvents(context.Background(), key, []*clientv3.Event{ + {Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key: []byte("hello"), Value: []byte("world1")}}, + }) + c.handleWatchEvents(context.Background(), key, []*clientv3.Event{ + {Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key: []byte("hello"), Value: []byte("world2")}}, + }) +} + type mockListener struct { } diff --git a/core/discov/subscriber.go b/core/discov/subscriber.go index 3b7b43164..a5f5ae4a4 100644 --- a/core/discov/subscriber.go +++ b/core/discov/subscriber.go @@ -141,12 +141,23 @@ func (c *container) addKv(key, value string) ([]string, bool) { c.lock.Lock() defer c.lock.Unlock() + oldVal, alreadyMapped := c.mapping[key] + if alreadyMapped && oldVal == value { + // duplicate PUT with same key+value, nothing to do + return nil, false + } + c.dirty.Set(true) + if alreadyMapped { + // key moved to a new value; remove stale entry to prevent leak + c.doRemoveKey(key) + } + keys := c.values[value] previous := append([]string(nil), keys...) early := len(keys) > 0 if c.exclusive && early { - for _, each := range keys { + for _, each := range previous { c.doRemoveKey(each) } } diff --git a/core/discov/subscriber_test.go b/core/discov/subscriber_test.go index 4c5c355eb..0c4d3c147 100644 --- a/core/discov/subscriber_test.go +++ b/core/discov/subscriber_test.go @@ -201,6 +201,81 @@ func TestContainer(t *testing.T) { } } +func TestContainer_DuplicateAdd(t *testing.T) { + c := newContainer(false) + // Simulate 100 duplicate PUT events for the same key+value. + for i := 0; i < 100; i++ { + c.OnAdd(internal.KV{Key: "etcd-key", Val: "host:1234"}) + } + assert.ElementsMatch(t, []string{"host:1234"}, c.GetValues()) + // Internal slice must not have grown beyond one entry. + c.lock.Lock() + assert.Len(t, c.values["host:1234"], 1) + c.lock.Unlock() +} + +func TestContainer_KeyValueChange(t *testing.T) { + c := newContainer(false) + c.OnAdd(internal.KV{Key: "etcd-key", Val: "host:1234"}) + assert.ElementsMatch(t, []string{"host:1234"}, c.GetValues()) + + // Key moves to a different server value. + c.OnAdd(internal.KV{Key: "etcd-key", Val: "host:5678"}) + assert.ElementsMatch(t, []string{"host:5678"}, c.GetValues()) + + // Old server must be fully removed; a subsequent delete must leave nothing. + c.OnDelete(internal.KV{Key: "etcd-key", Val: "host:5678"}) + assert.Empty(t, c.GetValues()) +} + +// TestContainer_ExclusiveMode verifies that adding successive keys for the same +// value in exclusive mode leaves only the latest key and evicts all prior ones. +func TestContainer_ExclusiveMode(t *testing.T) { + c := newContainer(true) + c.OnAdd(internal.KV{Key: "key1", Val: "server1"}) + c.OnAdd(internal.KV{Key: "key2", Val: "server1"}) + c.OnAdd(internal.KV{Key: "key3", Val: "server1"}) + + assert.ElementsMatch(t, []string{"server1"}, c.GetValues()) + c.lock.Lock() + assert.Equal(t, []string{"key3"}, c.values["server1"], "only the latest key must remain") + assert.NotContains(t, c.mapping, "key1", "key1 must have been evicted") + assert.NotContains(t, c.mapping, "key2", "key2 must have been evicted") + assert.Equal(t, "server1", c.mapping["key3"]) + c.lock.Unlock() +} + +// TestContainer_ExclusiveMode_MultipleEvictions injects 3 keys for the same +// value directly into internal state and then triggers the exclusive eviction +// loop via OnAdd. This exercises the range-over-previous fix: iterating over +// the live slice (range keys) would corrupt iteration when doRemoveKey +// compacts the shared underlying array in-place, causing the second and third +// keys to be skipped; ranging over the deep copy (range previous) is safe. +func TestContainer_ExclusiveMode_MultipleEvictions(t *testing.T) { + c := newContainer(true) + + // Bypass the exclusive invariant to simulate 3 pre-existing keys for the + // same value — the state that would expose the in-place aliasing bug. + c.lock.Lock() + c.values["server1"] = []string{"key1", "key2", "key3"} + c.mapping["key1"] = "server1" + c.mapping["key2"] = "server1" + c.mapping["key3"] = "server1" + c.lock.Unlock() + + // Adding key4 must evict all three existing keys via the exclusive loop. + c.OnAdd(internal.KV{Key: "key4", Val: "server1"}) + + assert.ElementsMatch(t, []string{"server1"}, c.GetValues()) + c.lock.Lock() + assert.Equal(t, []string{"key4"}, c.values["server1"], "all prior keys must be evicted") + assert.NotContains(t, c.mapping, "key1", "key1 must be evicted") + assert.NotContains(t, c.mapping, "key2", "key2 must be evicted") + assert.NotContains(t, c.mapping, "key3", "key3 must be evicted") + assert.Equal(t, "server1", c.mapping["key4"]) + c.lock.Unlock() +} + func TestSubscriber(t *testing.T) { sub := new(Subscriber) Exclusive()(sub)