Files
coder/vpn/tunnel_internal_test.go
T
Spike Curtis bddb808b25 chore: arrange imports in a standard way (#21452)
Fixes all our Go file imports to match the preferred spec that we've _mostly_ been using. For example:

```
import (
	"context"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"golang.org/x/xerrors"
	"gopkg.in/natefinch/lumberjack.v2"

	"cdr.dev/slog/v3"
	"github.com/coder/coder/v2/codersdk/agentsdk"
	"github.com/coder/serpent"
)
```

3 groups: standard library, 3rd partly libs, Coder libs.

This PR makes the change across the codebase. The PR in the stack above modifies our formatting to maintain this state of affairs, and is a separate PR so it's possible to review that one in detail.
2026-01-08 15:24:11 +04:00

1173 lines
34 KiB
Go

package vpn
import (
"context"
"maps"
"net"
"net/netip"
"net/url"
"slices"
"strings"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
"tailscale.com/ipn/ipnstate"
"tailscale.com/tailcfg"
"tailscale.com/util/dnsname"
maputil "github.com/coder/coder/v2/coderd/util/maps"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/coder/v2/testutil"
"github.com/coder/quartz"
)
func newFakeClient(ctx context.Context, t *testing.T) *fakeClient {
return &fakeClient{
t: t,
ctx: ctx,
ch: make(chan *fakeConn, 1),
}
}
type fakeClient struct {
t *testing.T
ctx context.Context
ch chan *fakeConn
}
var _ Client = (*fakeClient)(nil)
func (f *fakeClient) NewConn(context.Context, *url.URL, string, *Options) (Conn, error) {
select {
case <-f.ctx.Done():
return nil, f.ctx.Err()
case conn := <-f.ch:
return conn, nil
}
}
func newFakeConn(state tailnet.WorkspaceUpdate, hsTime time.Time) *fakeConn {
return &fakeConn{
closed: make(chan struct{}),
state: state,
hsTime: hsTime,
}
}
func (f *fakeConn) withManualPings() *fakeConn {
f.returnPing = make(chan struct{})
return f
}
type fakeConn struct {
state tailnet.WorkspaceUpdate
returnPing chan struct{}
hsTime time.Time
closed chan struct{}
doClose sync.Once
}
func (*fakeConn) DERPMap() *tailcfg.DERPMap {
return &tailcfg.DERPMap{
Regions: map[int]*tailcfg.DERPRegion{
999: {
RegionID: 999,
RegionCode: "zzz",
RegionName: "Coder Region",
},
},
}
}
func (*fakeConn) Node() *tailnet.Node {
return &tailnet.Node{
PreferredDERP: 999,
DERPLatency: map[string]float64{
"999": 0.1,
},
}
}
var _ Conn = (*fakeConn)(nil)
func (f *fakeConn) Ping(ctx context.Context, agentID uuid.UUID) (time.Duration, bool, *ipnstate.PingResult, error) {
if f.returnPing == nil {
return time.Millisecond * 100, true, &ipnstate.PingResult{
DERPRegionID: 999,
}, nil
}
select {
case <-ctx.Done():
return 0, false, nil, ctx.Err()
case <-f.returnPing:
return time.Millisecond * 100, true, &ipnstate.PingResult{
DERPRegionID: 999,
}, nil
}
}
func (f *fakeConn) CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error) {
return f.state, nil
}
func (f *fakeConn) GetPeerDiagnostics(uuid.UUID) tailnet.PeerDiagnostics {
return tailnet.PeerDiagnostics{
LastWireguardHandshake: f.hsTime,
}
}
func (f *fakeConn) Close() error {
f.doClose.Do(func() {
close(f.closed)
})
return nil
}
func TestTunnel_StartStop(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{}, time.Time{})
_, mgr := setupTunnel(t, ctx, client, quartz.NewMock(t))
errCh := make(chan error, 1)
var resp *TunnelMessage
// When: we start the tunnel
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
Headers: []*StartRequest_Header{
{Name: "X-Test-Header", Value: "test"},
},
DeviceOs: "macOS",
DeviceId: "device001",
CoderDesktopVersion: "0.24.8",
},
},
})
resp = r
errCh <- err
}()
// Then: `NewConn` is called,
testutil.RequireSend(ctx, t, client.ch, conn)
// And: a response is received
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
// When: we stop the tunnel
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Stop{},
})
resp = r
errCh <- err
}()
// Then: `Close` is called on the connection
testutil.TryReceive(ctx, t, conn.closed)
// And: a Stop response is received
err = testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok = resp.Msg.(*TunnelMessage_Stop)
require.True(t, ok)
err = mgr.Close()
require.NoError(t, err)
}
func TestTunnel_PeerUpdate(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
wsID1 := uuid.UUID{1}
wsID2 := uuid.UUID{2}
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wsID1,
},
{
ID: wsID2,
},
},
}, time.Time{})
tun, mgr := setupTunnel(t, ctx, client, quartz.NewMock(t))
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSend(ctx, t, client.ch, conn)
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
// When: we inform the tunnel of a WorkspaceUpdate
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wsID2,
},
},
})
require.NoError(t, err)
// Then: the tunnel sends a PeerUpdate message
req := testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedWorkspaces, 1)
require.Equal(t, wsID2[:], req.msg.GetPeerUpdate().UpsertedWorkspaces[0].Id)
// When: the manager requests a PeerUpdate
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_GetPeerUpdate{},
})
resp = r
errCh <- err
}()
// Then: a PeerUpdate message is sent using the Conn's state
err = testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok = resp.Msg.(*TunnelMessage_PeerUpdate)
require.True(t, ok)
require.Len(t, resp.GetPeerUpdate().UpsertedWorkspaces, 2)
require.Equal(t, wsID1[:], resp.GetPeerUpdate().UpsertedWorkspaces[0].Id)
require.Equal(t, wsID2[:], resp.GetPeerUpdate().UpsertedWorkspaces[1].Id)
}
func TestTunnel_NetworkSettings(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{}, time.Time{})
tun, mgr := setupTunnel(t, ctx, client, quartz.NewMock(t))
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSend(ctx, t, client.ch, conn)
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
// When: we inform the tunnel of network settings
go func() {
err := tun.ApplyNetworkSettings(ctx, &NetworkSettingsRequest{
Mtu: 1200,
})
errCh <- err
}()
// Then: the tunnel sends a NetworkSettings message
req := testutil.TryReceive(ctx, t, mgr.requests)
require.NotNil(t, req.msg.Rpc)
require.Equal(t, uint32(1200), req.msg.GetNetworkSettings().Mtu)
go func() {
testutil.RequireSend(ctx, t, mgr.sendCh, &ManagerMessage{
Rpc: &RPC{ResponseTo: req.msg.Rpc.MsgId},
Msg: &ManagerMessage_NetworkSettings{
NetworkSettings: &NetworkSettingsResponse{
Success: true,
},
},
})
}()
// And: `ApplyNetworkSettings` returns without error once the manager responds
err = testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
}
func TestUpdater_createPeerUpdate(t *testing.T) {
t.Parallel()
w1ID := uuid.UUID{1}
w2ID := uuid.UUID{2}
w1a1ID := uuid.UUID{4}
w2a1ID := uuid.UUID{5}
w1a1IP := netip.MustParseAddr("fd60:627a:a42b:0101::")
w2a1IP := netip.MustParseAddr("fd60:627a:a42b:0301::")
ctx := testutil.Context(t, testutil.WaitShort)
hsTime := time.Now().Add(-time.Minute).UTC()
updater := updater{
ctx: ctx,
netLoopDone: make(chan struct{}),
agents: map[uuid.UUID]agentWithPing{},
workspaces: map[uuid.UUID]tailnet.Workspace{},
conn: newFakeConn(tailnet.WorkspaceUpdate{}, hsTime),
}
update := updater.createPeerUpdateLocked(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{ID: w1ID, Name: "w1", Status: proto.Workspace_STARTING},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: w1a1ID, Name: "w1a1", WorkspaceID: w1ID,
Hosts: map[dnsname.FQDN][]netip.Addr{
"w1.coder.": {w1a1IP},
"w1a1.w1.me.coder.": {w1a1IP},
"w1a1.w1.testy.coder.": {w1a1IP},
},
},
},
DeletedWorkspaces: []*tailnet.Workspace{
{ID: w2ID, Name: "w2", Status: proto.Workspace_STOPPED},
},
DeletedAgents: []*tailnet.Agent{
{
ID: w2a1ID, Name: "w2a1", WorkspaceID: w2ID,
Hosts: map[dnsname.FQDN][]netip.Addr{
"w2.coder.": {w2a1IP},
"w2a1.w2.me.coder.": {w2a1IP},
"w2a1.w2.testy.coder.": {w2a1IP},
},
},
},
})
require.Len(t, update.UpsertedAgents, 1)
slices.SortFunc(update.UpsertedAgents[0].Fqdn, strings.Compare)
slices.SortFunc(update.DeletedAgents[0].Fqdn, strings.Compare)
require.Equal(t, update, &PeerUpdate{
UpsertedWorkspaces: []*Workspace{
{Id: w1ID[:], Name: "w1", Status: Workspace_Status(proto.Workspace_STARTING)},
},
UpsertedAgents: []*Agent{
{
Id: w1a1ID[:], Name: "w1a1", WorkspaceId: w1ID[:],
Fqdn: []string{"w1.coder.", "w1a1.w1.me.coder.", "w1a1.w1.testy.coder."},
IpAddrs: []string{w1a1IP.String()},
LastHandshake: timestamppb.New(hsTime),
},
},
DeletedWorkspaces: []*Workspace{
{Id: w2ID[:], Name: "w2", Status: Workspace_Status(proto.Workspace_STOPPED)},
},
DeletedAgents: []*Agent{
{
Id: w2a1ID[:], Name: "w2a1", WorkspaceId: w2ID[:],
Fqdn: []string{"w2.coder.", "w2a1.w2.me.coder.", "w2a1.w2.testy.coder."},
IpAddrs: []string{w2a1IP.String()},
LastHandshake: nil,
},
},
})
}
func TestTunnel_sendAgentUpdate(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
mClock := quartz.NewMock(t)
wID1 := uuid.UUID{1}
aID1 := uuid.UUID{2}
aID2 := uuid.UUID{3}
hsTime := time.Now().Add(-time.Minute).UTC()
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{}, hsTime)
tun, mgr := setupTunnel(t, ctx, client, mClock)
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSend(ctx, t, client.ch, conn)
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
// Inform the tunnel of the initial state
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wID1, Name: "w1", Status: proto.Workspace_STARTING,
},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: aID1,
Name: "agent1",
WorkspaceID: wID1,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
})
require.NoError(t, err)
req := testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
// `sendAgentUpdate` produces the same PeerUpdate message until an agent
// update is received
for range 2 {
mClock.AdvanceNext()
// Then: the tunnel sends a PeerUpdate message of agent upserts,
// with the last handshake and latency set
req = testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[0].LastHandshake.AsTime())
}
// Latency is gathered in the background, so it'll eventually be sent
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
if len(req.msg.GetPeerUpdate().UpsertedAgents) == 0 {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing == nil {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.Latency.AsDuration().Milliseconds() != 100 {
return false
}
return req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.PreferredDerp == "Coder Region"
}, testutil.IntervalFast)
// Upsert a new agent
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{},
UpsertedAgents: []*tailnet.Agent{
{
ID: aID2,
Name: "agent2",
WorkspaceID: wID1,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent2.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
})
require.NoError(t, err)
testutil.TryReceive(ctx, t, mgr.requests)
// The new update includes the new agent
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 2)
slices.SortFunc(req.msg.GetPeerUpdate().UpsertedAgents, func(a, b *Agent) int {
return strings.Compare(a.Name, b.Name)
})
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[0].LastHandshake.AsTime())
// The latency of the first agent is still set
require.NotNil(t, req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing)
require.EqualValues(t, 100, req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.Latency.AsDuration().Milliseconds())
require.Equal(t, aID2[:], req.msg.GetPeerUpdate().UpsertedAgents[1].Id)
require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[1].LastHandshake.AsTime())
// Delete an agent
err = tun.Update(tailnet.WorkspaceUpdate{
DeletedAgents: []*tailnet.Agent{
{
ID: aID1,
Name: "agent1",
WorkspaceID: wID1,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
})
require.NoError(t, err)
testutil.TryReceive(ctx, t, mgr.requests)
// The new update doesn't include the deleted agent
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
require.Equal(t, aID2[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[0].LastHandshake.AsTime())
// Eventually the second agent's latency is set
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
if len(req.msg.GetPeerUpdate().UpsertedAgents) == 0 {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing == nil {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.Latency.AsDuration().Milliseconds() != 100 {
return false
}
return req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.PreferredDerp == "Coder Region"
}, testutil.IntervalFast)
}
func TestTunnel_sendAgentUpdateReconnect(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
mClock := quartz.NewMock(t)
wID1 := uuid.UUID{1}
aID1 := uuid.UUID{2}
aID2 := uuid.UUID{3}
hsTime := time.Now().Add(-time.Minute).UTC()
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{}, hsTime)
tun, mgr := setupTunnel(t, ctx, client, mClock)
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSend(ctx, t, client.ch, conn)
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
// Inform the tunnel of the initial state
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wID1, Name: "w1", Status: proto.Workspace_STARTING,
},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: aID1,
Name: "agent1",
WorkspaceID: wID1,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
})
require.NoError(t, err)
req := testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
// Upsert a new agent simulating a reconnect
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wID1, Name: "w1", Status: proto.Workspace_STARTING,
},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: aID2,
Name: "agent2",
WorkspaceID: wID1,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent2.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
Kind: tailnet.Snapshot,
})
require.NoError(t, err)
// The new update only contains the new agent
req = testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
peerUpdate := req.msg.GetPeerUpdate()
require.NotNil(t, peerUpdate)
require.Len(t, peerUpdate.UpsertedAgents, 1)
require.Len(t, peerUpdate.DeletedAgents, 1)
require.Len(t, peerUpdate.DeletedWorkspaces, 0)
require.Equal(t, aID2[:], peerUpdate.UpsertedAgents[0].Id)
require.Equal(t, hsTime, peerUpdate.UpsertedAgents[0].LastHandshake.AsTime())
require.Equal(t, aID1[:], peerUpdate.DeletedAgents[0].Id)
}
func TestTunnel_sendAgentUpdateWorkspaceReconnect(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
mClock := quartz.NewMock(t)
wID1 := uuid.UUID{1}
wID2 := uuid.UUID{2}
aID1 := uuid.UUID{3}
aID3 := uuid.UUID{4}
hsTime := time.Now().Add(-time.Minute).UTC()
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{}, hsTime)
tun, mgr := setupTunnel(t, ctx, client, mClock)
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSend(ctx, t, client.ch, conn)
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
// Inform the tunnel of the initial state
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wID1, Name: "w1", Status: proto.Workspace_STARTING,
},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: aID1,
Name: "agent1",
WorkspaceID: wID1,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
})
require.NoError(t, err)
req := testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
// Upsert a new agent with a new workspace while simulating a reconnect
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wID2, Name: "w2", Status: proto.Workspace_STARTING,
},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: aID3,
Name: "agent3",
WorkspaceID: wID2,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent3.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
Kind: tailnet.Snapshot,
})
require.NoError(t, err)
// The new update only contains the new agent
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
mClock.AdvanceNext()
require.Nil(t, req.msg.Rpc)
peerUpdate := req.msg.GetPeerUpdate()
require.NotNil(t, peerUpdate)
require.Len(t, peerUpdate.UpsertedWorkspaces, 1)
require.Len(t, peerUpdate.UpsertedAgents, 1)
require.Len(t, peerUpdate.DeletedAgents, 1)
require.Len(t, peerUpdate.DeletedWorkspaces, 1)
require.Equal(t, wID2[:], peerUpdate.UpsertedWorkspaces[0].Id)
require.Equal(t, aID3[:], peerUpdate.UpsertedAgents[0].Id)
require.Equal(t, hsTime, peerUpdate.UpsertedAgents[0].LastHandshake.AsTime())
require.Equal(t, aID1[:], peerUpdate.DeletedAgents[0].Id)
require.Equal(t, wID1[:], peerUpdate.DeletedWorkspaces[0].Id)
}
func TestTunnel_slowPing(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
mClock := quartz.NewMock(t)
wID1 := uuid.UUID{1}
aID1 := uuid.UUID{2}
hsTime := time.Now().Add(-time.Minute).UTC()
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{}, hsTime).withManualPings()
tun, mgr := setupTunnel(t, ctx, client, mClock)
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSend(ctx, t, client.ch, conn)
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
// Inform the tunnel of the initial state
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wID1, Name: "w1", Status: proto.Workspace_STARTING,
},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: aID1,
Name: "agent1",
WorkspaceID: wID1,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
})
require.NoError(t, err)
req := testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
// We can't check that it *never* pings, so the best we can do is
// check it doesn't ping even with 5 goroutines attempting to,
// and that updates are received as normal
for range 5 {
mClock.AdvanceNext()
require.Nil(t, req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing)
}
// Provided that it hasn't been 5 seconds since the last AdvanceNext call,
// there'll be a ping in-flight that will return with this message
testutil.RequireSend(ctx, t, conn.returnPing, struct{}{})
// Which will mean we'll eventually receive a PeerUpdate with the ping
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
if len(req.msg.GetPeerUpdate().UpsertedAgents) == 0 {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing == nil {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.Latency.AsDuration().Milliseconds() != 100 {
return false
}
return req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.PreferredDerp == "Coder Region"
}, testutil.IntervalFast)
}
func TestTunnel_stopMidPing(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
mClock := quartz.NewMock(t)
wID1 := uuid.UUID{1}
aID1 := uuid.UUID{2}
hsTime := time.Now().Add(-time.Minute).UTC()
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{}, hsTime).withManualPings()
tun, mgr := setupTunnel(t, ctx, client, mClock)
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSend(ctx, t, client.ch, conn)
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
// Inform the tunnel of the initial state
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wID1, Name: "w1", Status: proto.Workspace_STARTING,
},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: aID1,
Name: "agent1",
WorkspaceID: wID1,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
})
require.NoError(t, err)
req := testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
// We'll have some pings in flight when we stop
for range 5 {
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing)
}
// Stop the tunnel
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Stop{},
})
resp = r
errCh <- err
}()
testutil.TryReceive(ctx, t, conn.closed)
err = testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok = resp.Msg.(*TunnelMessage_Stop)
require.True(t, ok)
}
//nolint:revive // t takes precedence
func setupTunnel(t *testing.T, ctx context.Context, client *fakeClient, mClock *quartz.Mock) (*Tunnel, *speaker[*ManagerMessage, *TunnelMessage, TunnelMessage]) {
mp, tp := net.Pipe()
t.Cleanup(func() { _ = mp.Close() })
t.Cleanup(func() { _ = tp.Close() })
logger := testutil.Logger(t)
// We're creating a trap for the mClock to ensure that
// AdvanceNext() is not called before the ticker is created.
trap := mClock.Trap().NewTicker()
defer trap.Close()
var tun *Tunnel
var mgr *speaker[*ManagerMessage, *TunnelMessage, TunnelMessage]
errCh := make(chan error, 2)
go func() {
tunnel, err := NewTunnel(ctx, logger.Named("tunnel"), tp, client, WithClock(mClock))
tun = tunnel
errCh <- err
}()
go func() {
manager, err := newSpeaker[*ManagerMessage, *TunnelMessage](ctx, logger.Named("manager"), mp, SpeakerRoleManager, SpeakerRoleTunnel)
mgr = manager
errCh <- err
}()
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
err = testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
mgr.start()
// We're releasing the trap to allow the clock to advance the ticker.
trap.MustWait(ctx).MustRelease(ctx)
return tun, mgr
}
func TestProcessFreshState(t *testing.T) {
t.Parallel()
wsID1 := uuid.New()
wsID2 := uuid.New()
wsID3 := uuid.New()
wsID4 := uuid.New()
agentID1 := uuid.New()
agentID2 := uuid.New()
agentID3 := uuid.New()
agentID4 := uuid.New()
agent1 := tailnet.Agent{ID: agentID1, Name: "agent1", WorkspaceID: wsID1}
agent2 := tailnet.Agent{ID: agentID2, Name: "agent2", WorkspaceID: wsID2}
agent3 := tailnet.Agent{ID: agentID3, Name: "agent3", WorkspaceID: wsID3}
agent4 := tailnet.Agent{ID: agentID4, Name: "agent4", WorkspaceID: wsID1}
ws1 := tailnet.Workspace{ID: wsID1, Name: "ws1", Status: proto.Workspace_RUNNING}
ws2 := tailnet.Workspace{ID: wsID2, Name: "ws2", Status: proto.Workspace_RUNNING}
ws3 := tailnet.Workspace{ID: wsID3, Name: "ws3", Status: proto.Workspace_RUNNING}
ws4 := tailnet.Workspace{ID: wsID4, Name: "ws4", Status: proto.Workspace_RUNNING}
initialAgents := map[uuid.UUID]tailnet.Agent{
agentID1: agent1,
agentID2: agent2,
agentID4: agent4,
}
initialWorkspaces := map[uuid.UUID]tailnet.Workspace{
wsID1: ws1,
wsID2: ws2,
}
tests := []struct {
name string
initialAgents map[uuid.UUID]tailnet.Agent
initialWorkspaces map[uuid.UUID]tailnet.Workspace
update *tailnet.WorkspaceUpdate
expectedDelete *tailnet.WorkspaceUpdate // We only care about deletions added by the function
}{
{
name: "NoChange",
initialAgents: initialAgents,
initialWorkspaces: initialWorkspaces,
update: &tailnet.WorkspaceUpdate{
Kind: tailnet.Snapshot,
UpsertedWorkspaces: []*tailnet.Workspace{&ws1, &ws2},
UpsertedAgents: []*tailnet.Agent{&agent1, &agent2, &agent4},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
},
expectedDelete: &tailnet.WorkspaceUpdate{ // Expect no *additional* deletions
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
},
},
{
name: "AgentAdded", // Agent 3 added in update
initialAgents: initialAgents,
initialWorkspaces: initialWorkspaces,
update: &tailnet.WorkspaceUpdate{
Kind: tailnet.Snapshot,
UpsertedWorkspaces: []*tailnet.Workspace{&ws1, &ws2, &ws3},
UpsertedAgents: []*tailnet.Agent{&agent1, &agent2, &agent3, &agent4},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
},
expectedDelete: &tailnet.WorkspaceUpdate{
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
},
},
{
name: "AgentRemovedWorkspaceAlsoRemoved", // Agent 2 removed, ws2 also removed
initialAgents: initialAgents,
initialWorkspaces: initialWorkspaces,
update: &tailnet.WorkspaceUpdate{
Kind: tailnet.Snapshot,
UpsertedWorkspaces: []*tailnet.Workspace{&ws1}, // ws2 not present
UpsertedAgents: []*tailnet.Agent{&agent1, &agent4}, // agent2 not present
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
},
expectedDelete: &tailnet.WorkspaceUpdate{
DeletedWorkspaces: []*tailnet.Workspace{
{ID: wsID2, Name: "ws2", Status: proto.Workspace_RUNNING},
}, // Expect ws2 to be deleted
DeletedAgents: []*tailnet.Agent{ // Expect agent2 to be deleted
{ID: agentID2, Name: "agent2", WorkspaceID: wsID2},
},
},
},
{
name: "AgentRemovedWorkspaceStays", // Agent 4 removed, but ws1 stays (due to agent1)
initialAgents: initialAgents,
initialWorkspaces: initialWorkspaces,
update: &tailnet.WorkspaceUpdate{
Kind: tailnet.Snapshot,
UpsertedWorkspaces: []*tailnet.Workspace{&ws1, &ws2}, // ws1 still present
UpsertedAgents: []*tailnet.Agent{&agent1, &agent2}, // agent4 not present
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
},
expectedDelete: &tailnet.WorkspaceUpdate{
DeletedWorkspaces: []*tailnet.Workspace{}, // ws1 should NOT be deleted
DeletedAgents: []*tailnet.Agent{ // Expect agent4 to be deleted
{ID: agentID4, Name: "agent4", WorkspaceID: wsID1},
},
},
},
{
name: "InitialAgentsEmpty",
initialAgents: map[uuid.UUID]tailnet.Agent{}, // Start with no agents known
initialWorkspaces: map[uuid.UUID]tailnet.Workspace{},
update: &tailnet.WorkspaceUpdate{
Kind: tailnet.Snapshot,
UpsertedWorkspaces: []*tailnet.Workspace{&ws1, &ws2},
UpsertedAgents: []*tailnet.Agent{&agent1, &agent2},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
},
expectedDelete: &tailnet.WorkspaceUpdate{ // Expect no deletions added
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
},
},
{
name: "UpdateEmpty", // Snapshot says nothing exists
initialAgents: initialAgents,
initialWorkspaces: initialWorkspaces,
update: &tailnet.WorkspaceUpdate{
Kind: tailnet.Snapshot,
UpsertedWorkspaces: []*tailnet.Workspace{},
UpsertedAgents: []*tailnet.Agent{},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
},
expectedDelete: &tailnet.WorkspaceUpdate{ // Expect all initial agents/workspaces to be deleted
DeletedWorkspaces: []*tailnet.Workspace{
{ID: wsID1, Name: "ws1", Status: proto.Workspace_RUNNING},
{ID: wsID2, Name: "ws2", Status: proto.Workspace_RUNNING},
}, // ws1 and ws2 deleted
DeletedAgents: []*tailnet.Agent{ // agent1, agent2, agent4 deleted
{ID: agentID1, Name: "agent1", WorkspaceID: wsID1},
{ID: agentID2, Name: "agent2", WorkspaceID: wsID2},
{ID: agentID4, Name: "agent4", WorkspaceID: wsID1},
},
},
},
{
name: "WorkspaceWithNoAgents", // Snapshot says nothing exists
initialAgents: initialAgents,
initialWorkspaces: map[uuid.UUID]tailnet.Workspace{wsID1: ws1, wsID2: ws2, wsID4: ws4}, // ws4 has no agents
update: &tailnet.WorkspaceUpdate{
Kind: tailnet.Snapshot,
UpsertedWorkspaces: []*tailnet.Workspace{&ws1, &ws2},
UpsertedAgents: []*tailnet.Agent{&agent1, &agent2, &agent4},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
},
expectedDelete: &tailnet.WorkspaceUpdate{ // Expect all initial agents/workspaces to be deleted
DeletedWorkspaces: []*tailnet.Workspace{
{ID: wsID4, Name: "ws4", Status: proto.Workspace_RUNNING},
}, // ws4 should be deleted
DeletedAgents: []*tailnet.Agent{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
agentsCopy := maputil.Map(tt.initialAgents, func(a tailnet.Agent) agentWithPing {
return agentWithPing{
Agent: a.Clone(),
lastPing: nil,
}
})
workspaceCopy := maps.Clone(tt.initialWorkspaces)
processSnapshotUpdate(tt.update, agentsCopy, workspaceCopy)
require.ElementsMatch(t, tt.expectedDelete.DeletedAgents, tt.update.DeletedAgents, "DeletedAgents mismatch")
require.ElementsMatch(t, tt.expectedDelete.DeletedWorkspaces, tt.update.DeletedWorkspaces, "DeletedWorkspaces mismatch")
})
}
}