diff --git a/enterprise/tailnet/pgcoord.go b/enterprise/tailnet/pgcoord.go index 4b8268e7a5..08325e567f 100644 --- a/enterprise/tailnet/pgcoord.go +++ b/enterprise/tailnet/pgcoord.go @@ -567,8 +567,9 @@ func (b *binder) handleBindings() { b.logger.Debug(b.ctx, "binder exiting") return case bnd := <-b.bindings: - b.storeBinding(bnd) - b.workQ.enqueue(bnd.bKey) + if b.storeBinding(bnd) { + b.workQ.enqueue(bnd.bKey) + } } } } @@ -642,26 +643,46 @@ func (b *binder) writeOne(bnd binding) error { // storeBinding stores the latest binding, where we interpret kind == DISCONNECTED as removing the binding. This keeps the map // from growing without bound. -func (b *binder) storeBinding(bnd binding) { +func (b *binder) storeBinding(bnd binding) bool { b.mu.Lock() defer b.mu.Unlock() switch bnd.kind { case proto.CoordinateResponse_PeerUpdate_NODE: + old, ok := b.latest[bnd.bKey] + if ok && old.kind == proto.CoordinateResponse_PeerUpdate_NODE && + nodesEqual(old.node, bnd.node) { + return false + } b.latest[bnd.bKey] = bnd case proto.CoordinateResponse_PeerUpdate_DISCONNECTED: delete(b.latest, bnd.bKey) case proto.CoordinateResponse_PeerUpdate_LOST: - // we need to coalesce with the previously stored node, since it must - // be non-nil in the database + // We need to coalesce with the previously stored node, since it + // must be non-nil in the database. old, ok := b.latest[bnd.bKey] if !ok { - // lost before we ever got a node update. No action - return + // Lost before we ever got a node update. No action. + return false } bnd.node = old.node b.latest[bnd.bKey] = bnd } + return true +} + +// nodesEqual compares two proto.Node messages, ignoring the AsOf +// timestamp which changes on every node build even when nothing else +// has changed. +func nodesEqual(a, b *proto.Node) bool { + if a == nil || b == nil { + return a == b + } + //nolint:forcetypeassert + aClone, bClone := gProto.Clone(a).(*proto.Node), gProto.Clone(b).(*proto.Node) + aClone.AsOf = nil + bClone.AsOf = nil + return gProto.Equal(aClone, bClone) } // retrieveBinding gets the latest binding for a key. diff --git a/enterprise/tailnet/pgcoord_internal_test.go b/enterprise/tailnet/pgcoord_internal_test.go index 975e499278..ffb8113110 100644 --- a/enterprise/tailnet/pgcoord_internal_test.go +++ b/enterprise/tailnet/pgcoord_internal_test.go @@ -16,6 +16,7 @@ import ( "go.uber.org/mock/gomock" "golang.org/x/xerrors" gProto "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" "cdr.dev/slog/v3" "cdr.dev/slog/v3/sloggers/slogtest" @@ -518,3 +519,109 @@ func TestWorkQ_Acquire_WrapsAcquireBatch(t *testing.T) { assert.Equal(t, peer, key) q.done(key) } + +func Test_nodesEqual(t *testing.T) { + t.Parallel() + + t.Run("BothNil", func(t *testing.T) { + t.Parallel() + assert.True(t, nodesEqual(nil, nil)) + }) + + t.Run("OneNil", func(t *testing.T) { + t.Parallel() + assert.False(t, nodesEqual(&proto.Node{PreferredDerp: 1}, nil)) + assert.False(t, nodesEqual(nil, &proto.Node{PreferredDerp: 1})) + }) + + t.Run("IgnoresAsOf", func(t *testing.T) { + t.Parallel() + a := &proto.Node{ + PreferredDerp: 1, + AsOf: timestamppb.Now(), + } + b := &proto.Node{ + PreferredDerp: 1, + AsOf: timestamppb.New(time.Now().Add(-time.Hour)), + } + assert.True(t, nodesEqual(a, b)) + // Verify AsOf fields are restored. + assert.NotNil(t, a.AsOf) + assert.NotNil(t, b.AsOf) + }) + + t.Run("DifferentPreferredDERP", func(t *testing.T) { + t.Parallel() + a := &proto.Node{PreferredDerp: 1} + b := &proto.Node{PreferredDerp: 2} + assert.False(t, nodesEqual(a, b)) + }) +} + +func Test_storeBinding(t *testing.T) { + t.Parallel() + + t.Run("SkipsNoop", func(t *testing.T) { + t.Parallel() + + key := bKey(uuid.New()) + node := &proto.Node{PreferredDerp: 1} + + b := &binder{ + latest: make(map[bKey]binding), + } + + bnd := binding{bKey: key, node: node, kind: proto.CoordinateResponse_PeerUpdate_NODE} + + // First store should succeed. + assert.True(t, b.storeBinding(bnd)) + + // Same node (even with different AsOf) should be skipped. + bnd2 := binding{ + bKey: key, + node: &proto.Node{PreferredDerp: 1, AsOf: timestamppb.Now()}, + kind: proto.CoordinateResponse_PeerUpdate_NODE, + } + assert.False(t, b.storeBinding(bnd2)) + }) + + t.Run("AllowsChangedNode", func(t *testing.T) { + t.Parallel() + + key := bKey(uuid.New()) + + b := &binder{ + latest: make(map[bKey]binding), + } + + bnd1 := binding{bKey: key, node: &proto.Node{PreferredDerp: 1}, kind: proto.CoordinateResponse_PeerUpdate_NODE} + assert.True(t, b.storeBinding(bnd1)) + + bnd2 := binding{bKey: key, node: &proto.Node{PreferredDerp: 2}, kind: proto.CoordinateResponse_PeerUpdate_NODE} + assert.True(t, b.storeBinding(bnd2)) + }) + + t.Run("LostToNodeTransition", func(t *testing.T) { + t.Parallel() + + key := bKey(uuid.New()) + + b := &binder{ + latest: make(map[bKey]binding), + } + + node := &proto.Node{PreferredDerp: 1} + + // NODE should enqueue. + bnd1 := binding{bKey: key, node: node, kind: proto.CoordinateResponse_PeerUpdate_NODE} + assert.True(t, b.storeBinding(bnd1)) + + // LOST should enqueue (transitions state). + bnd2 := binding{bKey: key, kind: proto.CoordinateResponse_PeerUpdate_LOST} + assert.True(t, b.storeBinding(bnd2)) + + // NODE again should enqueue (transitioning back from LOST). + bnd3 := binding{bKey: key, node: node, kind: proto.CoordinateResponse_PeerUpdate_NODE} + assert.True(t, b.storeBinding(bnd3)) + }) +}