From 5f516ed135118ee721021c5722b62f3ea5c8cd2e Mon Sep 17 00:00:00 2001 From: Michael Suchacz <203725896+ibetitsmike@users.noreply.github.com> Date: Tue, 6 May 2025 16:00:16 +0200 Subject: [PATCH] feat: improve coder connect tunnel handling on reconnect (#17598) Closes https://github.com/coder/internal/issues/563 The [Coder Connect tunnel](https://github.com/coder/coder/blob/main/vpn/tunnel.go) receives workspace state from the Coder server over a [dRPC stream.](https://github.com/coder/coder/blob/114ba4593b2a82dfd41cdcb7fd6eb70d866e7b86/tailnet/controllers.go#L1029) When first connecting to this stream, the current state of the user's workspaces is received, with subsequent messages being diffs on top of that state. However, if the client disconnects from this stream, such as when the user's device is suspended, and then reconnects later, no mechanism exists for the tunnel to differentiate that message containing the entire initial state from another diff, and so that state is incorrectly applied as a diff. In practice: - Tunnel connects, receives a workspace update containing all the existing workspaces & agents. - Tunnel loses connection, but isn't completely stopped. - All the user's workspaces are restarted, producing a new set of agents. - Tunnel regains connection, and receives a workspace update containing all the existing workspaces & agents. - This initial update is incorrectly applied as a diff, with the Tunnel's state containing both the old & new agents. This PR introduces a solution in which tunnelUpdater, when created, sends a FreshState flag with the WorkspaceUpdate type. This flag is handled in the vpn tunnel in the following fashion: - Preserve existing Agents - Remove current Agents in the tunnel that are not present in the WorkspaceUpdate - Remove unreferenced Workspaces --- tailnet/controllers.go | 32 ++- tailnet/controllers_test.go | 9 +- vpn/tunnel.go | 77 +++++-- vpn/tunnel_internal_test.go | 396 ++++++++++++++++++++++++++++++++++++ 4 files changed, 498 insertions(+), 16 deletions(-) diff --git a/tailnet/controllers.go b/tailnet/controllers.go index 2328e19640..b7d4e246a4 100644 --- a/tailnet/controllers.go +++ b/tailnet/controllers.go @@ -897,6 +897,21 @@ type Workspace struct { agents map[uuid.UUID]*Agent } +func (w *Workspace) Clone() Workspace { + agents := make(map[uuid.UUID]*Agent, len(w.agents)) + for k, v := range w.agents { + clone := v.Clone() + agents[k] = &clone + } + return Workspace{ + ID: w.ID, + Name: w.Name, + Status: w.Status, + ownerUsername: w.ownerUsername, + agents: agents, + } +} + type DNSNameOptions struct { Suffix string } @@ -1049,6 +1064,7 @@ func (t *tunnelUpdater) recvLoop() { t.logger.Debug(context.Background(), "tunnel updater recvLoop started") defer t.logger.Debug(context.Background(), "tunnel updater recvLoop done") defer close(t.recvLoopDone) + updateKind := Snapshot for { update, err := t.client.Recv() if err != nil { @@ -1061,8 +1077,10 @@ func (t *tunnelUpdater) recvLoop() { } t.logger.Debug(context.Background(), "got workspace update", slog.F("workspace_update", update), + slog.F("update_kind", updateKind), ) - err = t.handleUpdate(update) + err = t.handleUpdate(update, updateKind) + updateKind = Diff if err != nil { t.logger.Critical(context.Background(), "failed to handle workspace Update", slog.Error(err)) cErr := t.client.Close() @@ -1083,14 +1101,23 @@ type WorkspaceUpdate struct { UpsertedAgents []*Agent DeletedWorkspaces []*Workspace DeletedAgents []*Agent + Kind UpdateKind } +type UpdateKind int + +const ( + Diff UpdateKind = iota + Snapshot +) + func (w *WorkspaceUpdate) Clone() WorkspaceUpdate { clone := WorkspaceUpdate{ UpsertedWorkspaces: make([]*Workspace, len(w.UpsertedWorkspaces)), UpsertedAgents: make([]*Agent, len(w.UpsertedAgents)), DeletedWorkspaces: make([]*Workspace, len(w.DeletedWorkspaces)), DeletedAgents: make([]*Agent, len(w.DeletedAgents)), + Kind: w.Kind, } for i, ws := range w.UpsertedWorkspaces { clone.UpsertedWorkspaces[i] = &Workspace{ @@ -1115,7 +1142,7 @@ func (w *WorkspaceUpdate) Clone() WorkspaceUpdate { return clone } -func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error { +func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate, updateKind UpdateKind) error { t.Lock() defer t.Unlock() @@ -1124,6 +1151,7 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error { UpsertedAgents: []*Agent{}, DeletedWorkspaces: []*Workspace{}, DeletedAgents: []*Agent{}, + Kind: updateKind, } for _, uw := range update.UpsertedWorkspaces { diff --git a/tailnet/controllers_test.go b/tailnet/controllers_test.go index 67834de462..bb5b543378 100644 --- a/tailnet/controllers_test.go +++ b/tailnet/controllers_test.go @@ -1611,6 +1611,7 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) { }, DeletedWorkspaces: []*tailnet.Workspace{}, DeletedAgents: []*tailnet.Agent{}, + Kind: tailnet.Snapshot, } // And the callback @@ -1626,6 +1627,9 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) { slices.SortFunc(recvState.UpsertedAgents, func(a, b *tailnet.Agent) int { return strings.Compare(a.Name, b.Name) }) + // tunnel is still open, so it's a diff + currentState.Kind = tailnet.Diff + require.Equal(t, currentState, recvState) } @@ -1692,14 +1696,17 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) { }, DeletedWorkspaces: []*tailnet.Workspace{}, DeletedAgents: []*tailnet.Agent{}, + Kind: tailnet.Snapshot, } cbUpdate := testutil.TryReceive(ctx, t, fUH.ch) require.Equal(t, initRecvUp, cbUpdate) - // Current state should match initial state, err := updateCtrl.CurrentState() require.NoError(t, err) + // tunnel is still open, so it's a diff + initRecvUp.Kind = tailnet.Diff + require.Equal(t, initRecvUp, state) // Send update that removes w1a1 and adds w1a2 diff --git a/vpn/tunnel.go b/vpn/tunnel.go index 63de203980..6c71aecaa0 100644 --- a/vpn/tunnel.go +++ b/vpn/tunnel.go @@ -88,6 +88,7 @@ func NewTunnel( netLoopDone: make(chan struct{}), uSendCh: s.sendCh, agents: map[uuid.UUID]tailnet.Agent{}, + workspaces: map[uuid.UUID]tailnet.Workspace{}, clock: quartz.NewReal(), }, } @@ -347,7 +348,9 @@ type updater struct { uSendCh chan<- *TunnelMessage // agents contains the agents that are currently connected to the tunnel. agents map[uuid.UUID]tailnet.Agent - conn Conn + // workspaces contains the workspaces to which agents are currently connected via the tunnel. + workspaces map[uuid.UUID]tailnet.Workspace + conn Conn clock quartz.Clock } @@ -397,6 +400,11 @@ func (u *updater) sendUpdateResponse(req *request[*TunnelMessage, *ManagerMessag // createPeerUpdateLocked creates a PeerUpdate message from a workspace update, populating // the network status of the agents. func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUpdate { + // if the update is a snapshot, we need to process the full state + if update.Kind == tailnet.Snapshot { + processSnapshotUpdate(&update, u.agents, u.workspaces) + } + out := &PeerUpdate{ UpsertedWorkspaces: make([]*Workspace, len(update.UpsertedWorkspaces)), UpsertedAgents: make([]*Agent, len(update.UpsertedAgents)), @@ -404,7 +412,20 @@ func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUp DeletedAgents: make([]*Agent, len(update.DeletedAgents)), } - u.saveUpdateLocked(update) + // save the workspace update to the tunnel's state, such that it can + // be used to populate automated peer updates. + for _, agent := range update.UpsertedAgents { + u.agents[agent.ID] = agent.Clone() + } + for _, agent := range update.DeletedAgents { + delete(u.agents, agent.ID) + } + for _, workspace := range update.UpsertedWorkspaces { + u.workspaces[workspace.ID] = workspace.Clone() + } + for _, workspace := range update.DeletedWorkspaces { + delete(u.workspaces, workspace.ID) + } for i, ws := range update.UpsertedWorkspaces { out.UpsertedWorkspaces[i] = &Workspace{ @@ -413,6 +434,7 @@ func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUp Status: Workspace_Status(ws.Status), } } + upsertedAgents := u.convertAgentsLocked(update.UpsertedAgents) out.UpsertedAgents = upsertedAgents for i, ws := range update.DeletedWorkspaces { @@ -472,17 +494,6 @@ func (u *updater) convertAgentsLocked(agents []*tailnet.Agent) []*Agent { return out } -// saveUpdateLocked saves the workspace update to the tunnel's state, such that it can -// be used to populate automated peer updates. -func (u *updater) saveUpdateLocked(update tailnet.WorkspaceUpdate) { - for _, agent := range update.UpsertedAgents { - u.agents[agent.ID] = agent.Clone() - } - for _, agent := range update.DeletedAgents { - delete(u.agents, agent.ID) - } -} - // setConn sets the `conn` and returns false if there's already a connection set. func (u *updater) setConn(conn Conn) bool { u.mu.Lock() @@ -552,6 +563,46 @@ func (u *updater) netStatusLoop() { } } +// processSnapshotUpdate handles the logic when a full state update is received. +// When the tunnel is live, we only receive diffs, but the first packet on any given +// reconnect to the tailnet API is a full state. +// Without this logic we weren't processing deletes for any workspaces or agents deleted +// while the client was disconnected while the computer was asleep. +func processSnapshotUpdate(update *tailnet.WorkspaceUpdate, agents map[uuid.UUID]tailnet.Agent, workspaces map[uuid.UUID]tailnet.Workspace) { + // ignoredWorkspaces is initially populated with the workspaces that are + // in the current update. Later on we populate it with the deleted workspaces too + // so that we don't send duplicate updates. Same applies to ignoredAgents. + ignoredWorkspaces := make(map[uuid.UUID]struct{}, len(update.UpsertedWorkspaces)) + ignoredAgents := make(map[uuid.UUID]struct{}, len(update.UpsertedAgents)) + + for _, workspace := range update.UpsertedWorkspaces { + ignoredWorkspaces[workspace.ID] = struct{}{} + } + for _, agent := range update.UpsertedAgents { + ignoredAgents[agent.ID] = struct{}{} + } + for _, agent := range agents { + if _, present := ignoredAgents[agent.ID]; !present { + // delete any current agents that are not in the new update + update.DeletedAgents = append(update.DeletedAgents, &tailnet.Agent{ + ID: agent.ID, + Name: agent.Name, + WorkspaceID: agent.WorkspaceID, + }) + } + } + for _, workspace := range workspaces { + if _, present := ignoredWorkspaces[workspace.ID]; !present { + update.DeletedWorkspaces = append(update.DeletedWorkspaces, &tailnet.Workspace{ + ID: workspace.ID, + Name: workspace.Name, + Status: workspace.Status, + }) + ignoredWorkspaces[workspace.ID] = struct{}{} + } + } +} + // hostsToIPStrings returns a slice of all unique IP addresses in the values // of the given map. func hostsToIPStrings(hosts map[dnsname.FQDN][]netip.Addr) []string { diff --git a/vpn/tunnel_internal_test.go b/vpn/tunnel_internal_test.go index d1d7377361..2beba66d7a 100644 --- a/vpn/tunnel_internal_test.go +++ b/vpn/tunnel_internal_test.go @@ -2,6 +2,7 @@ package vpn import ( "context" + "maps" "net" "net/netip" "net/url" @@ -292,6 +293,7 @@ func TestUpdater_createPeerUpdate(t *testing.T) { ctx: ctx, netLoopDone: make(chan struct{}), agents: map[uuid.UUID]tailnet.Agent{}, + workspaces: map[uuid.UUID]tailnet.Workspace{}, conn: newFakeConn(tailnet.WorkspaceUpdate{}, hsTime), } @@ -486,6 +488,212 @@ func TestTunnel_sendAgentUpdate(t *testing.T) { require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[0].LastHandshake.AsTime()) } +func TestTunnel_sendAgentUpdateReconnect(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + + mClock := quartz.NewMock(t) + + wID1 := uuid.UUID{1} + aID1 := uuid.UUID{2} + aID2 := uuid.UUID{3} + + hsTime := time.Now().Add(-time.Minute).UTC() + + client := newFakeClient(ctx, t) + conn := newFakeConn(tailnet.WorkspaceUpdate{}, hsTime) + + tun, mgr := setupTunnel(t, ctx, client, mClock) + errCh := make(chan error, 1) + var resp *TunnelMessage + go func() { + r, err := mgr.unaryRPC(ctx, &ManagerMessage{ + Msg: &ManagerMessage_Start{ + Start: &StartRequest{ + TunnelFileDescriptor: 2, + CoderUrl: "https://coder.example.com", + ApiToken: "fakeToken", + }, + }, + }) + resp = r + errCh <- err + }() + testutil.RequireSend(ctx, t, client.ch, conn) + err := testutil.TryReceive(ctx, t, errCh) + require.NoError(t, err) + _, ok := resp.Msg.(*TunnelMessage_Start) + require.True(t, ok) + + // Inform the tunnel of the initial state + err = tun.Update(tailnet.WorkspaceUpdate{ + UpsertedWorkspaces: []*tailnet.Workspace{ + { + ID: wID1, Name: "w1", Status: proto.Workspace_STARTING, + }, + }, + UpsertedAgents: []*tailnet.Agent{ + { + ID: aID1, + Name: "agent1", + WorkspaceID: wID1, + Hosts: map[dnsname.FQDN][]netip.Addr{ + "agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, + }, + }, + }, + }) + require.NoError(t, err) + req := testutil.TryReceive(ctx, t, mgr.requests) + require.Nil(t, req.msg.Rpc) + require.NotNil(t, req.msg.GetPeerUpdate()) + require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1) + require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id) + + // Upsert a new agent simulating a reconnect + err = tun.Update(tailnet.WorkspaceUpdate{ + UpsertedWorkspaces: []*tailnet.Workspace{ + { + ID: wID1, Name: "w1", Status: proto.Workspace_STARTING, + }, + }, + UpsertedAgents: []*tailnet.Agent{ + { + ID: aID2, + Name: "agent2", + WorkspaceID: wID1, + Hosts: map[dnsname.FQDN][]netip.Addr{ + "agent2.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, + }, + }, + }, + Kind: tailnet.Snapshot, + }) + require.NoError(t, err) + + // The new update only contains the new agent + mClock.AdvanceNext() + req = testutil.TryReceive(ctx, t, mgr.requests) + require.Nil(t, req.msg.Rpc) + peerUpdate := req.msg.GetPeerUpdate() + require.NotNil(t, peerUpdate) + require.Len(t, peerUpdate.UpsertedAgents, 1) + require.Len(t, peerUpdate.DeletedAgents, 1) + require.Len(t, peerUpdate.DeletedWorkspaces, 0) + + require.Equal(t, aID2[:], peerUpdate.UpsertedAgents[0].Id) + require.Equal(t, hsTime, peerUpdate.UpsertedAgents[0].LastHandshake.AsTime()) + + require.Equal(t, aID1[:], peerUpdate.DeletedAgents[0].Id) +} + +func TestTunnel_sendAgentUpdateWorkspaceReconnect(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + + mClock := quartz.NewMock(t) + + wID1 := uuid.UUID{1} + wID2 := uuid.UUID{2} + aID1 := uuid.UUID{3} + aID3 := uuid.UUID{4} + + hsTime := time.Now().Add(-time.Minute).UTC() + + client := newFakeClient(ctx, t) + conn := newFakeConn(tailnet.WorkspaceUpdate{}, hsTime) + + tun, mgr := setupTunnel(t, ctx, client, mClock) + errCh := make(chan error, 1) + var resp *TunnelMessage + go func() { + r, err := mgr.unaryRPC(ctx, &ManagerMessage{ + Msg: &ManagerMessage_Start{ + Start: &StartRequest{ + TunnelFileDescriptor: 2, + CoderUrl: "https://coder.example.com", + ApiToken: "fakeToken", + }, + }, + }) + resp = r + errCh <- err + }() + testutil.RequireSend(ctx, t, client.ch, conn) + err := testutil.TryReceive(ctx, t, errCh) + require.NoError(t, err) + _, ok := resp.Msg.(*TunnelMessage_Start) + require.True(t, ok) + + // Inform the tunnel of the initial state + err = tun.Update(tailnet.WorkspaceUpdate{ + UpsertedWorkspaces: []*tailnet.Workspace{ + { + ID: wID1, Name: "w1", Status: proto.Workspace_STARTING, + }, + }, + UpsertedAgents: []*tailnet.Agent{ + { + ID: aID1, + Name: "agent1", + WorkspaceID: wID1, + Hosts: map[dnsname.FQDN][]netip.Addr{ + "agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, + }, + }, + }, + }) + require.NoError(t, err) + req := testutil.TryReceive(ctx, t, mgr.requests) + require.Nil(t, req.msg.Rpc) + require.NotNil(t, req.msg.GetPeerUpdate()) + require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1) + require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id) + + // Upsert a new agent with a new workspace while simulating a reconnect + err = tun.Update(tailnet.WorkspaceUpdate{ + UpsertedWorkspaces: []*tailnet.Workspace{ + { + ID: wID2, Name: "w2", Status: proto.Workspace_STARTING, + }, + }, + UpsertedAgents: []*tailnet.Agent{ + { + ID: aID3, + Name: "agent3", + WorkspaceID: wID2, + Hosts: map[dnsname.FQDN][]netip.Addr{ + "agent3.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, + }, + }, + }, + Kind: tailnet.Snapshot, + }) + require.NoError(t, err) + + // The new update only contains the new agent + mClock.AdvanceNext() + req = testutil.TryReceive(ctx, t, mgr.requests) + mClock.AdvanceNext() + + require.Nil(t, req.msg.Rpc) + peerUpdate := req.msg.GetPeerUpdate() + require.NotNil(t, peerUpdate) + require.Len(t, peerUpdate.UpsertedWorkspaces, 1) + require.Len(t, peerUpdate.UpsertedAgents, 1) + require.Len(t, peerUpdate.DeletedAgents, 1) + require.Len(t, peerUpdate.DeletedWorkspaces, 1) + + require.Equal(t, wID2[:], peerUpdate.UpsertedWorkspaces[0].Id) + require.Equal(t, aID3[:], peerUpdate.UpsertedAgents[0].Id) + require.Equal(t, hsTime, peerUpdate.UpsertedAgents[0].LastHandshake.AsTime()) + + require.Equal(t, aID1[:], peerUpdate.DeletedAgents[0].Id) + require.Equal(t, wID1[:], peerUpdate.DeletedWorkspaces[0].Id) +} + //nolint:revive // t takes precedence func setupTunnel(t *testing.T, ctx context.Context, client *fakeClient, mClock quartz.Clock) (*Tunnel, *speaker[*ManagerMessage, *TunnelMessage, TunnelMessage]) { mp, tp := net.Pipe() @@ -513,3 +721,191 @@ func setupTunnel(t *testing.T, ctx context.Context, client *fakeClient, mClock q mgr.start() return tun, mgr } + +func TestProcessFreshState(t *testing.T) { + t.Parallel() + + wsID1 := uuid.New() + wsID2 := uuid.New() + wsID3 := uuid.New() + wsID4 := uuid.New() + + agentID1 := uuid.New() + agentID2 := uuid.New() + agentID3 := uuid.New() + agentID4 := uuid.New() + + agent1 := tailnet.Agent{ID: agentID1, Name: "agent1", WorkspaceID: wsID1} + agent2 := tailnet.Agent{ID: agentID2, Name: "agent2", WorkspaceID: wsID2} + agent3 := tailnet.Agent{ID: agentID3, Name: "agent3", WorkspaceID: wsID3} + agent4 := tailnet.Agent{ID: agentID4, Name: "agent4", WorkspaceID: wsID1} + + ws1 := tailnet.Workspace{ID: wsID1, Name: "ws1", Status: proto.Workspace_RUNNING} + ws2 := tailnet.Workspace{ID: wsID2, Name: "ws2", Status: proto.Workspace_RUNNING} + ws3 := tailnet.Workspace{ID: wsID3, Name: "ws3", Status: proto.Workspace_RUNNING} + ws4 := tailnet.Workspace{ID: wsID4, Name: "ws4", Status: proto.Workspace_RUNNING} + + initialAgents := map[uuid.UUID]tailnet.Agent{ + agentID1: agent1, + agentID2: agent2, + agentID4: agent4, + } + initialWorkspaces := map[uuid.UUID]tailnet.Workspace{ + wsID1: ws1, + wsID2: ws2, + } + + tests := []struct { + name string + initialAgents map[uuid.UUID]tailnet.Agent + initialWorkspaces map[uuid.UUID]tailnet.Workspace + update *tailnet.WorkspaceUpdate + expectedDelete *tailnet.WorkspaceUpdate // We only care about deletions added by the function + }{ + { + name: "NoChange", + initialAgents: initialAgents, + initialWorkspaces: initialWorkspaces, + update: &tailnet.WorkspaceUpdate{ + Kind: tailnet.Snapshot, + UpsertedWorkspaces: []*tailnet.Workspace{&ws1, &ws2}, + UpsertedAgents: []*tailnet.Agent{&agent1, &agent2, &agent4}, + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + }, + expectedDelete: &tailnet.WorkspaceUpdate{ // Expect no *additional* deletions + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + }, + }, + { + name: "AgentAdded", // Agent 3 added in update + initialAgents: initialAgents, + initialWorkspaces: initialWorkspaces, + update: &tailnet.WorkspaceUpdate{ + Kind: tailnet.Snapshot, + UpsertedWorkspaces: []*tailnet.Workspace{&ws1, &ws2, &ws3}, + UpsertedAgents: []*tailnet.Agent{&agent1, &agent2, &agent3, &agent4}, + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + }, + expectedDelete: &tailnet.WorkspaceUpdate{ + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + }, + }, + { + name: "AgentRemovedWorkspaceAlsoRemoved", // Agent 2 removed, ws2 also removed + initialAgents: initialAgents, + initialWorkspaces: initialWorkspaces, + update: &tailnet.WorkspaceUpdate{ + Kind: tailnet.Snapshot, + UpsertedWorkspaces: []*tailnet.Workspace{&ws1}, // ws2 not present + UpsertedAgents: []*tailnet.Agent{&agent1, &agent4}, // agent2 not present + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + }, + expectedDelete: &tailnet.WorkspaceUpdate{ + DeletedWorkspaces: []*tailnet.Workspace{ + {ID: wsID2, Name: "ws2", Status: proto.Workspace_RUNNING}, + }, // Expect ws2 to be deleted + DeletedAgents: []*tailnet.Agent{ // Expect agent2 to be deleted + {ID: agentID2, Name: "agent2", WorkspaceID: wsID2}, + }, + }, + }, + { + name: "AgentRemovedWorkspaceStays", // Agent 4 removed, but ws1 stays (due to agent1) + initialAgents: initialAgents, + initialWorkspaces: initialWorkspaces, + update: &tailnet.WorkspaceUpdate{ + Kind: tailnet.Snapshot, + UpsertedWorkspaces: []*tailnet.Workspace{&ws1, &ws2}, // ws1 still present + UpsertedAgents: []*tailnet.Agent{&agent1, &agent2}, // agent4 not present + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + }, + expectedDelete: &tailnet.WorkspaceUpdate{ + DeletedWorkspaces: []*tailnet.Workspace{}, // ws1 should NOT be deleted + DeletedAgents: []*tailnet.Agent{ // Expect agent4 to be deleted + {ID: agentID4, Name: "agent4", WorkspaceID: wsID1}, + }, + }, + }, + { + name: "InitialAgentsEmpty", + initialAgents: map[uuid.UUID]tailnet.Agent{}, // Start with no agents known + initialWorkspaces: map[uuid.UUID]tailnet.Workspace{}, + update: &tailnet.WorkspaceUpdate{ + Kind: tailnet.Snapshot, + UpsertedWorkspaces: []*tailnet.Workspace{&ws1, &ws2}, + UpsertedAgents: []*tailnet.Agent{&agent1, &agent2}, + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + }, + expectedDelete: &tailnet.WorkspaceUpdate{ // Expect no deletions added + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + }, + }, + { + name: "UpdateEmpty", // Snapshot says nothing exists + initialAgents: initialAgents, + initialWorkspaces: initialWorkspaces, + update: &tailnet.WorkspaceUpdate{ + Kind: tailnet.Snapshot, + UpsertedWorkspaces: []*tailnet.Workspace{}, + UpsertedAgents: []*tailnet.Agent{}, + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + }, + expectedDelete: &tailnet.WorkspaceUpdate{ // Expect all initial agents/workspaces to be deleted + DeletedWorkspaces: []*tailnet.Workspace{ + {ID: wsID1, Name: "ws1", Status: proto.Workspace_RUNNING}, + {ID: wsID2, Name: "ws2", Status: proto.Workspace_RUNNING}, + }, // ws1 and ws2 deleted + DeletedAgents: []*tailnet.Agent{ // agent1, agent2, agent4 deleted + {ID: agentID1, Name: "agent1", WorkspaceID: wsID1}, + {ID: agentID2, Name: "agent2", WorkspaceID: wsID2}, + {ID: agentID4, Name: "agent4", WorkspaceID: wsID1}, + }, + }, + }, + { + name: "WorkspaceWithNoAgents", // Snapshot says nothing exists + initialAgents: initialAgents, + initialWorkspaces: map[uuid.UUID]tailnet.Workspace{wsID1: ws1, wsID2: ws2, wsID4: ws4}, // ws4 has no agents + update: &tailnet.WorkspaceUpdate{ + Kind: tailnet.Snapshot, + UpsertedWorkspaces: []*tailnet.Workspace{&ws1, &ws2}, + UpsertedAgents: []*tailnet.Agent{&agent1, &agent2, &agent4}, + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + }, + expectedDelete: &tailnet.WorkspaceUpdate{ // Expect all initial agents/workspaces to be deleted + DeletedWorkspaces: []*tailnet.Workspace{ + {ID: wsID4, Name: "ws4", Status: proto.Workspace_RUNNING}, + }, // ws4 should be deleted + DeletedAgents: []*tailnet.Agent{}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + agentsCopy := make(map[uuid.UUID]tailnet.Agent) + maps.Copy(agentsCopy, tt.initialAgents) + + workspaceCopy := make(map[uuid.UUID]tailnet.Workspace) + maps.Copy(workspaceCopy, tt.initialWorkspaces) + + processSnapshotUpdate(tt.update, agentsCopy, workspaceCopy) + + require.ElementsMatch(t, tt.expectedDelete.DeletedAgents, tt.update.DeletedAgents, "DeletedAgents mismatch") + require.ElementsMatch(t, tt.expectedDelete.DeletedWorkspaces, tt.update.DeletedWorkspaces, "DeletedWorkspaces mismatch") + }) + } +}