From 8701dbc87459e65d3a5ba20784d80740c686d752 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Thu, 11 Jan 2024 09:29:42 +0400 Subject: [PATCH] chore: add nodeUpdater to tailnet (#11539) Adds a nodeUpdater component, which serves a similar role to configMaps, but tracks information from tailscale going out to the coordinator as node updates. This first PR just handles netInfo, subsequent PRs will handle DERP forced websockets, endpoints, and addresses. --- tailnet/configmaps.go | 10 ++- tailnet/configmaps_internal_test.go | 10 +-- tailnet/node.go | 134 ++++++++++++++++++++++++++++ tailnet/node_internal_test.go | 110 +++++++++++++++++++++++ 4 files changed, 256 insertions(+), 8 deletions(-) create mode 100644 tailnet/node.go create mode 100644 tailnet/node_internal_test.go diff --git a/tailnet/configmaps.go b/tailnet/configmaps.go index 2e5e019bf2..028ba7dfff 100644 --- a/tailnet/configmaps.go +++ b/tailnet/configmaps.go @@ -48,13 +48,17 @@ const ( closed ) -type configMaps struct { +type phased struct { sync.Cond + phase phase +} + +type configMaps struct { + phased netmapDirty bool derpMapDirty bool filterDirty bool closing bool - phase phase engine engineConfigurable static netmap.NetworkMap @@ -71,7 +75,7 @@ type configMaps struct { func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg.NodeID, nodeKey key.NodePrivate, discoKey key.DiscoPublic, addresses []netip.Prefix) *configMaps { pubKey := nodeKey.Public() c := &configMaps{ - Cond: *(sync.NewCond(&sync.Mutex{})), + phased: phased{Cond: *(sync.NewCond(&sync.Mutex{}))}, logger: logger, engine: engine, static: netmap.NetworkMap{ diff --git a/tailnet/configmaps_internal_test.go b/tailnet/configmaps_internal_test.go index 003ac1b522..334bc43017 100644 --- a/tailnet/configmaps_internal_test.go +++ b/tailnet/configmaps_internal_test.go @@ -96,7 +96,7 @@ func TestConfigMaps_setAddresses_same(t *testing.T) { uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), addrs) defer uut.close() - requireNeverConfigures(ctx, t, uut) + requireNeverConfigures(ctx, t, &uut.phased) uut.setAddresses(addrs) @@ -190,7 +190,7 @@ func TestConfigMaps_updatePeers_same(t *testing.T) { defer uut.close() // Then: we don't configure - requireNeverConfigures(ctx, t, uut) + requireNeverConfigures(ctx, t, &uut.phased) p1ID := uuid.UUID{1} p1Node := newTestNode(1) @@ -558,7 +558,7 @@ func TestConfigMaps_setBlockEndpoints_same(t *testing.T) { uut.L.Unlock() // Then: we don't configure - requireNeverConfigures(ctx, t, uut) + requireNeverConfigures(ctx, t, &uut.phased) // When we set blockEndpoints to true uut.setBlockEndpoints(true) @@ -619,7 +619,7 @@ func TestConfigMaps_updatePeers_nonexist(t *testing.T) { defer uut.close() // Then: we don't configure - requireNeverConfigures(ctx, t, uut) + requireNeverConfigures(ctx, t, &uut.phased) // Given: no known peers go func() { @@ -669,7 +669,7 @@ func getNodeWithID(t testing.TB, peers []*tailcfg.Node, id tailcfg.NodeID) *tail return nil } -func requireNeverConfigures(ctx context.Context, t *testing.T, uut *configMaps) { +func requireNeverConfigures(ctx context.Context, t *testing.T, uut *phased) { t.Helper() waiting := make(chan struct{}) go func() { diff --git a/tailnet/node.go b/tailnet/node.go new file mode 100644 index 0000000000..a9912154d6 --- /dev/null +++ b/tailnet/node.go @@ -0,0 +1,134 @@ +package tailnet + +import ( + "context" + "net/netip" + "sync" + + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" + "tailscale.com/tailcfg" + "tailscale.com/types/key" + + "cdr.dev/slog" + "github.com/coder/coder/v2/coderd/database/dbtime" +) + +type nodeUpdater struct { + phased + dirty bool + closing bool + + // static + logger slog.Logger + id tailcfg.NodeID + key key.NodePublic + discoKey key.DiscoPublic + callback func(n *Node) + + // dynamic + preferredDERP int + derpLatency map[string]float64 + derpForcedWebsockets map[int]string + endpoints []string + addresses []netip.Prefix +} + +// updateLoop waits until the config is dirty and then calls the callback with the newest node. +// It is intended only to be called internally, and shuts down when close() is called. +func (u *nodeUpdater) updateLoop() { + u.L.Lock() + defer u.L.Unlock() + defer func() { + u.phase = closed + u.Broadcast() + }() + for { + for !(u.closing || u.dirty) { + u.phase = idle + u.Wait() + } + if u.closing { + return + } + node := u.nodeLocked() + u.dirty = false + u.phase = configuring + u.Broadcast() + + // We cannot reach nodes without DERP for discovery. Therefore, there is no point in sending + // the node without this, and we can save ourselves from churn in the tailscale/wireguard + // layer. + if node.PreferredDERP == 0 { + u.logger.Debug(context.Background(), "skipped sending node; no PreferredDERP", slog.F("node", node)) + continue + } + + u.L.Unlock() + u.callback(node) + u.L.Lock() + } +} + +// close closes the nodeUpdate and stops it calling the node callback +func (u *nodeUpdater) close() { + u.L.Lock() + defer u.L.Unlock() + u.closing = true + u.Broadcast() + for u.phase != closed { + u.Wait() + } +} + +func newNodeUpdater( + logger slog.Logger, callback func(n *Node), + id tailcfg.NodeID, np key.NodePublic, dp key.DiscoPublic, +) *nodeUpdater { + u := &nodeUpdater{ + phased: phased{Cond: *(sync.NewCond(&sync.Mutex{}))}, + logger: logger, + id: id, + key: np, + discoKey: dp, + callback: callback, + } + go u.updateLoop() + return u +} + +// nodeLocked returns the current best node information. u.L must be held. +func (u *nodeUpdater) nodeLocked() *Node { + return &Node{ + ID: u.id, + AsOf: dbtime.Now(), + Key: u.key, + Addresses: slices.Clone(u.addresses), + AllowedIPs: slices.Clone(u.addresses), + DiscoKey: u.discoKey, + Endpoints: slices.Clone(u.endpoints), + PreferredDERP: u.preferredDERP, + DERPLatency: maps.Clone(u.derpLatency), + DERPForcedWebsocket: maps.Clone(u.derpForcedWebsockets), + } +} + +// setNetInfo processes a NetInfo update from the wireguard engine. c.L MUST +// NOT be held. +func (u *nodeUpdater) setNetInfo(ni *tailcfg.NetInfo) { + u.L.Lock() + defer u.L.Unlock() + dirty := false + if u.preferredDERP != ni.PreferredDERP { + dirty = true + u.preferredDERP = ni.PreferredDERP + } + if !maps.Equal(u.derpLatency, ni.DERPLatency) { + dirty = true + u.derpLatency = ni.DERPLatency + } + if dirty { + u.dirty = true + u.Broadcast() + } +} diff --git a/tailnet/node_internal_test.go b/tailnet/node_internal_test.go new file mode 100644 index 0000000000..27dc5609d1 --- /dev/null +++ b/tailnet/node_internal_test.go @@ -0,0 +1,110 @@ +package tailnet + +import ( + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" + "tailscale.com/tailcfg" + "tailscale.com/types/key" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/v2/testutil" +) + +func TestNodeUpdater_setNetInfo_different(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + id := tailcfg.NodeID(1) + nodeKey := key.NewNode().Public() + discoKey := key.NewDisco().Public() + nodeCh := make(chan *Node) + goCh := make(chan struct{}) + uut := newNodeUpdater( + logger, + func(n *Node) { + nodeCh <- n + <-goCh + }, + id, nodeKey, discoKey, + ) + defer uut.close() + + dl := map[string]float64{"1": 0.025} + uut.setNetInfo(&tailcfg.NetInfo{ + PreferredDERP: 1, + DERPLatency: dl, + }) + + node := testutil.RequireRecvCtx(ctx, t, nodeCh) + require.Equal(t, nodeKey, node.Key) + require.Equal(t, discoKey, node.DiscoKey) + require.Equal(t, 1, node.PreferredDERP) + require.True(t, maps.Equal(dl, node.DERPLatency)) + + // Send in second update to test getting updates in the middle of the + // callback + uut.setNetInfo(&tailcfg.NetInfo{ + PreferredDERP: 2, + DERPLatency: dl, + }) + close(goCh) // allows callback to complete + + node = testutil.RequireRecvCtx(ctx, t, nodeCh) + require.Equal(t, nodeKey, node.Key) + require.Equal(t, discoKey, node.DiscoKey) + require.Equal(t, 2, node.PreferredDERP) + require.True(t, maps.Equal(dl, node.DERPLatency)) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +} + +func TestNodeUpdater_setNetInfo_same(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + id := tailcfg.NodeID(1) + nodeKey := key.NewNode().Public() + discoKey := key.NewDisco().Public() + nodeCh := make(chan *Node) + goCh := make(chan struct{}) + uut := newNodeUpdater( + logger, + func(n *Node) { + nodeCh <- n + <-goCh + }, + id, nodeKey, discoKey, + ) + defer uut.close() + + // Then: we don't configure + requireNeverConfigures(ctx, t, &uut.phased) + + // Given: preferred DERP and latency already set + dl := map[string]float64{"1": 0.025} + uut.L.Lock() + uut.preferredDERP = 1 + uut.derpLatency = maps.Clone(dl) + uut.L.Unlock() + + // When: new update with same info + uut.setNetInfo(&tailcfg.NetInfo{ + PreferredDERP: 1, + DERPLatency: dl, + }) + + done := make(chan struct{}) + go func() { + defer close(done) + uut.close() + }() + _ = testutil.RequireRecvCtx(ctx, t, done) +}