fix: skip no-op peer updates in pgcoord binder (#24226)

This commit is contained in:
Jon Ayers
2026-05-21 17:59:12 -05:00
committed by GitHub
parent 356bccddc2
commit 269bd0cb8d
2 changed files with 135 additions and 7 deletions
+28 -7
View File
@@ -567,8 +567,9 @@ func (b *binder) handleBindings() {
b.logger.Debug(b.ctx, "binder exiting")
return
case bnd := <-b.bindings:
b.storeBinding(bnd)
b.workQ.enqueue(bnd.bKey)
if b.storeBinding(bnd) {
b.workQ.enqueue(bnd.bKey)
}
}
}
}
@@ -642,26 +643,46 @@ func (b *binder) writeOne(bnd binding) error {
// storeBinding stores the latest binding, where we interpret kind == DISCONNECTED as removing the binding. This keeps the map
// from growing without bound.
func (b *binder) storeBinding(bnd binding) {
func (b *binder) storeBinding(bnd binding) bool {
b.mu.Lock()
defer b.mu.Unlock()
switch bnd.kind {
case proto.CoordinateResponse_PeerUpdate_NODE:
old, ok := b.latest[bnd.bKey]
if ok && old.kind == proto.CoordinateResponse_PeerUpdate_NODE &&
nodesEqual(old.node, bnd.node) {
return false
}
b.latest[bnd.bKey] = bnd
case proto.CoordinateResponse_PeerUpdate_DISCONNECTED:
delete(b.latest, bnd.bKey)
case proto.CoordinateResponse_PeerUpdate_LOST:
// we need to coalesce with the previously stored node, since it must
// be non-nil in the database
// We need to coalesce with the previously stored node, since it
// must be non-nil in the database.
old, ok := b.latest[bnd.bKey]
if !ok {
// lost before we ever got a node update. No action
return
// Lost before we ever got a node update. No action.
return false
}
bnd.node = old.node
b.latest[bnd.bKey] = bnd
}
return true
}
// nodesEqual compares two proto.Node messages, ignoring the AsOf
// timestamp which changes on every node build even when nothing else
// has changed.
func nodesEqual(a, b *proto.Node) bool {
if a == nil || b == nil {
return a == b
}
//nolint:forcetypeassert
aClone, bClone := gProto.Clone(a).(*proto.Node), gProto.Clone(b).(*proto.Node)
aClone.AsOf = nil
bClone.AsOf = nil
return gProto.Equal(aClone, bClone)
}
// retrieveBinding gets the latest binding for a key.
+107
View File
@@ -16,6 +16,7 @@ import (
"go.uber.org/mock/gomock"
"golang.org/x/xerrors"
gProto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"cdr.dev/slog/v3"
"cdr.dev/slog/v3/sloggers/slogtest"
@@ -518,3 +519,109 @@ func TestWorkQ_Acquire_WrapsAcquireBatch(t *testing.T) {
assert.Equal(t, peer, key)
q.done(key)
}
func Test_nodesEqual(t *testing.T) {
t.Parallel()
t.Run("BothNil", func(t *testing.T) {
t.Parallel()
assert.True(t, nodesEqual(nil, nil))
})
t.Run("OneNil", func(t *testing.T) {
t.Parallel()
assert.False(t, nodesEqual(&proto.Node{PreferredDerp: 1}, nil))
assert.False(t, nodesEqual(nil, &proto.Node{PreferredDerp: 1}))
})
t.Run("IgnoresAsOf", func(t *testing.T) {
t.Parallel()
a := &proto.Node{
PreferredDerp: 1,
AsOf: timestamppb.Now(),
}
b := &proto.Node{
PreferredDerp: 1,
AsOf: timestamppb.New(time.Now().Add(-time.Hour)),
}
assert.True(t, nodesEqual(a, b))
// Verify AsOf fields are restored.
assert.NotNil(t, a.AsOf)
assert.NotNil(t, b.AsOf)
})
t.Run("DifferentPreferredDERP", func(t *testing.T) {
t.Parallel()
a := &proto.Node{PreferredDerp: 1}
b := &proto.Node{PreferredDerp: 2}
assert.False(t, nodesEqual(a, b))
})
}
func Test_storeBinding(t *testing.T) {
t.Parallel()
t.Run("SkipsNoop", func(t *testing.T) {
t.Parallel()
key := bKey(uuid.New())
node := &proto.Node{PreferredDerp: 1}
b := &binder{
latest: make(map[bKey]binding),
}
bnd := binding{bKey: key, node: node, kind: proto.CoordinateResponse_PeerUpdate_NODE}
// First store should succeed.
assert.True(t, b.storeBinding(bnd))
// Same node (even with different AsOf) should be skipped.
bnd2 := binding{
bKey: key,
node: &proto.Node{PreferredDerp: 1, AsOf: timestamppb.Now()},
kind: proto.CoordinateResponse_PeerUpdate_NODE,
}
assert.False(t, b.storeBinding(bnd2))
})
t.Run("AllowsChangedNode", func(t *testing.T) {
t.Parallel()
key := bKey(uuid.New())
b := &binder{
latest: make(map[bKey]binding),
}
bnd1 := binding{bKey: key, node: &proto.Node{PreferredDerp: 1}, kind: proto.CoordinateResponse_PeerUpdate_NODE}
assert.True(t, b.storeBinding(bnd1))
bnd2 := binding{bKey: key, node: &proto.Node{PreferredDerp: 2}, kind: proto.CoordinateResponse_PeerUpdate_NODE}
assert.True(t, b.storeBinding(bnd2))
})
t.Run("LostToNodeTransition", func(t *testing.T) {
t.Parallel()
key := bKey(uuid.New())
b := &binder{
latest: make(map[bKey]binding),
}
node := &proto.Node{PreferredDerp: 1}
// NODE should enqueue.
bnd1 := binding{bKey: key, node: node, kind: proto.CoordinateResponse_PeerUpdate_NODE}
assert.True(t, b.storeBinding(bnd1))
// LOST should enqueue (transitions state).
bnd2 := binding{bKey: key, kind: proto.CoordinateResponse_PeerUpdate_LOST}
assert.True(t, b.storeBinding(bnd2))
// NODE again should enqueue (transitioning back from LOST).
bnd3 := binding{bKey: key, node: node, kind: proto.CoordinateResponse_PeerUpdate_NODE}
assert.True(t, b.storeBinding(bnd3))
})
}