fix(discov): prevent unbounded memory growth on duplicate etcd PUT events (#5580)

This commit is contained in:
Kevin Wan
2026-05-16 12:35:05 +08:00
committed by GitHub
parent 4ad4fd43b7
commit 7b5e7b1c26
4 changed files with 148 additions and 8 deletions

View File

@@ -263,14 +263,24 @@ func (c *cluster) handleWatchEvents(ctx context.Context, key watchKey, events []
for _, ev := range events {
switch ev.Type {
case clientv3.EventTypePut:
evKey := string(ev.Kv.Key)
evVal := string(ev.Kv.Value)
c.lock.Lock()
watcher.values[string(ev.Kv.Key)] = string(ev.Kv.Value)
oldVal, exists := watcher.values[evKey]
watcher.values[evKey] = evVal
c.lock.Unlock()
if exists && oldVal == evVal {
// duplicate PUT with same value, skip to prevent unbounded growth
continue
}
if exists {
// key moved to a new value, notify delete of old entry first
for _, l := range listeners {
l.OnDelete(KV{Key: evKey, Val: oldVal})
}
}
for _, l := range listeners {
l.OnAdd(KV{
Key: string(ev.Kv.Key),
Val: string(ev.Kv.Value),
})
l.OnAdd(KV{Key: evKey, Val: evVal})
}
case clientv3.EventTypeDelete:
c.lock.Lock()
@@ -433,7 +443,7 @@ func (c *cluster) setupWatch(cli EtcdClient, key watchKey, rev int64) (context.C
}
ctx, cancel := context.WithCancel(cli.Ctx())
c.lock.Lock()
if watcher, ok := c.watchers[key]; ok {
watcher.cancel = cancel

View File

@@ -517,7 +517,7 @@ func TestCluster_ConcurrentMonitor(t *testing.T) {
go func() {
defer wg.Done()
key := keys[idx%len(keys)]
if idx%2 == 0 {
// Half the goroutines add listeners (write operation)
c.addListener(key, &mockListener{})
@@ -543,6 +543,50 @@ func TestCluster_ConcurrentMonitor(t *testing.T) {
close(c.done)
}
func TestCluster_handleWatchEvents_DuplicatePut(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
listener := NewMockUpdateListener(ctrl)
// OnAdd must be called exactly once despite two PUT events with the same key+value.
listener.EXPECT().OnAdd(KV{Key: "hello", Val: "world"}).Times(1)
c := newCluster([]string{"any"})
key := watchKey{key: "any"}
c.watchers[key] = &watchValue{
listeners: []UpdateListener{listener},
values: make(map[string]string),
}
events := []*clientv3.Event{
{Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key: []byte("hello"), Value: []byte("world")}},
{Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key: []byte("hello"), Value: []byte("world")}},
}
c.handleWatchEvents(context.Background(), key, events)
}
func TestCluster_handleWatchEvents_ValueChange(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
listener := NewMockUpdateListener(ctrl)
gomock.InOrder(
listener.EXPECT().OnAdd(KV{Key: "hello", Val: "world1"}),
listener.EXPECT().OnDelete(KV{Key: "hello", Val: "world1"}),
listener.EXPECT().OnAdd(KV{Key: "hello", Val: "world2"}),
)
c := newCluster([]string{"any"})
key := watchKey{key: "any"}
c.watchers[key] = &watchValue{
listeners: []UpdateListener{listener},
values: make(map[string]string),
}
c.handleWatchEvents(context.Background(), key, []*clientv3.Event{
{Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key: []byte("hello"), Value: []byte("world1")}},
})
c.handleWatchEvents(context.Background(), key, []*clientv3.Event{
{Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key: []byte("hello"), Value: []byte("world2")}},
})
}
type mockListener struct {
}

View File

@@ -141,12 +141,23 @@ func (c *container) addKv(key, value string) ([]string, bool) {
c.lock.Lock()
defer c.lock.Unlock()
oldVal, alreadyMapped := c.mapping[key]
if alreadyMapped && oldVal == value {
// duplicate PUT with same key+value, nothing to do
return nil, false
}
c.dirty.Set(true)
if alreadyMapped {
// key moved to a new value; remove stale entry to prevent leak
c.doRemoveKey(key)
}
keys := c.values[value]
previous := append([]string(nil), keys...)
early := len(keys) > 0
if c.exclusive && early {
for _, each := range keys {
for _, each := range previous {
c.doRemoveKey(each)
}
}

View File

@@ -201,6 +201,81 @@ func TestContainer(t *testing.T) {
}
}
func TestContainer_DuplicateAdd(t *testing.T) {
c := newContainer(false)
// Simulate 100 duplicate PUT events for the same key+value.
for i := 0; i < 100; i++ {
c.OnAdd(internal.KV{Key: "etcd-key", Val: "host:1234"})
}
assert.ElementsMatch(t, []string{"host:1234"}, c.GetValues())
// Internal slice must not have grown beyond one entry.
c.lock.Lock()
assert.Len(t, c.values["host:1234"], 1)
c.lock.Unlock()
}
func TestContainer_KeyValueChange(t *testing.T) {
c := newContainer(false)
c.OnAdd(internal.KV{Key: "etcd-key", Val: "host:1234"})
assert.ElementsMatch(t, []string{"host:1234"}, c.GetValues())
// Key moves to a different server value.
c.OnAdd(internal.KV{Key: "etcd-key", Val: "host:5678"})
assert.ElementsMatch(t, []string{"host:5678"}, c.GetValues())
// Old server must be fully removed; a subsequent delete must leave nothing.
c.OnDelete(internal.KV{Key: "etcd-key", Val: "host:5678"})
assert.Empty(t, c.GetValues())
}
// TestContainer_ExclusiveMode verifies that adding successive keys for the same
// value in exclusive mode leaves only the latest key and evicts all prior ones.
func TestContainer_ExclusiveMode(t *testing.T) {
c := newContainer(true)
c.OnAdd(internal.KV{Key: "key1", Val: "server1"})
c.OnAdd(internal.KV{Key: "key2", Val: "server1"})
c.OnAdd(internal.KV{Key: "key3", Val: "server1"})
assert.ElementsMatch(t, []string{"server1"}, c.GetValues())
c.lock.Lock()
assert.Equal(t, []string{"key3"}, c.values["server1"], "only the latest key must remain")
assert.NotContains(t, c.mapping, "key1", "key1 must have been evicted")
assert.NotContains(t, c.mapping, "key2", "key2 must have been evicted")
assert.Equal(t, "server1", c.mapping["key3"])
c.lock.Unlock()
}
// TestContainer_ExclusiveMode_MultipleEvictions injects 3 keys for the same
// value directly into internal state and then triggers the exclusive eviction
// loop via OnAdd. This exercises the range-over-previous fix: iterating over
// the live slice (range keys) would corrupt iteration when doRemoveKey
// compacts the shared underlying array in-place, causing the second and third
// keys to be skipped; ranging over the deep copy (range previous) is safe.
func TestContainer_ExclusiveMode_MultipleEvictions(t *testing.T) {
c := newContainer(true)
// Bypass the exclusive invariant to simulate 3 pre-existing keys for the
// same value — the state that would expose the in-place aliasing bug.
c.lock.Lock()
c.values["server1"] = []string{"key1", "key2", "key3"}
c.mapping["key1"] = "server1"
c.mapping["key2"] = "server1"
c.mapping["key3"] = "server1"
c.lock.Unlock()
// Adding key4 must evict all three existing keys via the exclusive loop.
c.OnAdd(internal.KV{Key: "key4", Val: "server1"})
assert.ElementsMatch(t, []string{"server1"}, c.GetValues())
c.lock.Lock()
assert.Equal(t, []string{"key4"}, c.values["server1"], "all prior keys must be evicted")
assert.NotContains(t, c.mapping, "key1", "key1 must be evicted")
assert.NotContains(t, c.mapping, "key2", "key2 must be evicted")
assert.NotContains(t, c.mapping, "key3", "key3 must be evicted")
assert.Equal(t, "server1", c.mapping["key4"])
c.lock.Unlock()
}
func TestSubscriber(t *testing.T) {
sub := new(Subscriber)
Exclusive()(sub)