chore(vpn): send ping results over tunnel (#18200)

Closes #17982.

The purpose of this PR is to expose network latency via the API used by Coder Desktop.

This PR has the tunnel ping all known agents every 5 seconds, in order to produce an instance of:
```proto
message LastPing {
	// latency is the RTT of the ping to the agent.
	google.protobuf.Duration latency = 1;
	// did_p2p indicates whether the ping was sent P2P, or over DERP.
	bool did_p2p = 2;
	// preferred_derp is the human readable name of the preferred DERP region,
	// or the region used for the last ping, if it was sent over DERP.
	string preferred_derp = 3;
	// preferred_derp_latency is the last known latency to the preferred DERP
	// region. Unset if the region does not appear in the DERP map.
	optional google.protobuf.Duration preferred_derp_latency = 4;
}
```
The contents of this message are stored and included on all subsequent upsertions of the agent. 
Note that we upsert existing agents every 5 seconds to update the `last_handshake` value.

On the desktop apps, this message will be used to produce a tooltip similar to that of the VS Code extension:
<img width="495" alt="image" src="https://github.com/user-attachments/assets/d8b65f3d-f536-4c64-9af9-35c1a42c92d2" />
(wording not final)

Unlike the VS Code extension, we omit:
- The Latency of *all* available DERP regions. It seems not ideal to send a copy of this entire map for every online agent, and it certainly doesn't make sense for it to be on the `Agent` or `LastPing` message. 
If we do want to expose this info on Coder Desktop, we should consider how best to do so; maybe we want to include it on a more generic `Netcheck` message.
- The current throughput (Bytes up/down). This is out of scope of the linked issue, and is non-trivial to implement. I'm also not sure of the value given the frequency we're doing these status updates (every 5 seconds).
If we want to expose it, it'll be in a separate PR.

<img width="343" alt="image" src="https://github.com/user-attachments/assets/8447d03b-9721-4111-8ac1-332d70a1e8f1" />
This commit is contained in:
Ethan
2025-06-06 14:18:57 +10:00
committed by GitHub
parent b4f71b70aa
commit 0076e8479f
12 changed files with 973 additions and 318 deletions
+8 -37
View File
@@ -16,7 +16,6 @@ import (
"path/filepath" "path/filepath"
"regexp" "regexp"
"slices" "slices"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -31,7 +30,6 @@ import (
"golang.org/x/term" "golang.org/x/term"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
"tailscale.com/tailcfg"
"tailscale.com/types/netlogtype" "tailscale.com/types/netlogtype"
"cdr.dev/slog" "cdr.dev/slog"
@@ -40,11 +38,13 @@ import (
"github.com/coder/coder/v2/cli/cliui" "github.com/coder/coder/v2/cli/cliui"
"github.com/coder/coder/v2/cli/cliutil" "github.com/coder/coder/v2/cli/cliutil"
"github.com/coder/coder/v2/coderd/autobuild/notify" "github.com/coder/coder/v2/coderd/autobuild/notify"
"github.com/coder/coder/v2/coderd/util/maps"
"github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/pty" "github.com/coder/coder/v2/pty"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/quartz" "github.com/coder/quartz"
"github.com/coder/retry" "github.com/coder/retry"
"github.com/coder/serpent" "github.com/coder/serpent"
@@ -1456,28 +1456,6 @@ func collectNetworkStats(ctx context.Context, agentConn *workspacesdk.AgentConn,
} }
node := agentConn.Node() node := agentConn.Node()
derpMap := agentConn.DERPMap() derpMap := agentConn.DERPMap()
derpLatency := map[string]float64{}
// Convert DERP region IDs to friendly names for display in the UI.
for rawRegion, latency := range node.DERPLatency {
regionParts := strings.SplitN(rawRegion, "-", 2)
regionID, err := strconv.Atoi(regionParts[0])
if err != nil {
continue
}
region, found := derpMap.Regions[regionID]
if !found {
// It's possible that a workspace agent is using an old DERPMap
// and reports regions that do not exist. If that's the case,
// report the region as unknown!
region = &tailcfg.DERPRegion{
RegionID: regionID,
RegionName: fmt.Sprintf("Unnamed %d", regionID),
}
}
// Convert the microseconds to milliseconds.
derpLatency[region.RegionName] = latency * 1000
}
totalRx := uint64(0) totalRx := uint64(0)
totalTx := uint64(0) totalTx := uint64(0)
@@ -1491,27 +1469,20 @@ func collectNetworkStats(ctx context.Context, agentConn *workspacesdk.AgentConn,
uploadSecs := float64(totalTx) / dur.Seconds() uploadSecs := float64(totalTx) / dur.Seconds()
downloadSecs := float64(totalRx) / dur.Seconds() downloadSecs := float64(totalRx) / dur.Seconds()
// Sometimes the preferred DERP doesn't match the one we're actually preferredDerpName := tailnet.ExtractPreferredDERPName(pingResult, node, derpMap)
// connected with. Perhaps because the agent prefers a different DERP and derpLatency := tailnet.ExtractDERPLatency(node, derpMap)
// we're using that server instead.
preferredDerpID := node.PreferredDERP
if pingResult.DERPRegionID != 0 {
preferredDerpID = pingResult.DERPRegionID
}
preferredDerp, ok := derpMap.Regions[preferredDerpID]
preferredDerpName := fmt.Sprintf("Unnamed %d", preferredDerpID)
if ok {
preferredDerpName = preferredDerp.RegionName
}
if _, ok := derpLatency[preferredDerpName]; !ok { if _, ok := derpLatency[preferredDerpName]; !ok {
derpLatency[preferredDerpName] = 0 derpLatency[preferredDerpName] = 0
} }
derpLatencyMs := maps.Map(derpLatency, func(dur time.Duration) float64 {
return float64(dur) / float64(time.Millisecond)
})
return &sshNetworkStats{ return &sshNetworkStats{
P2P: p2p, P2P: p2p,
Latency: float64(latency.Microseconds()) / 1000, Latency: float64(latency.Microseconds()) / 1000,
PreferredDERP: preferredDerpName, PreferredDERP: preferredDerpName,
DERPLatency: derpLatency, DERPLatency: derpLatencyMs,
UploadBytesSec: int64(uploadSecs), UploadBytesSec: int64(uploadSecs),
DownloadBytesSec: int64(downloadSecs), DownloadBytesSec: int64(downloadSecs),
}, nil }, nil
-8
View File
@@ -47,14 +47,6 @@ func ListLazy[F any, T any](convert func(F) T) func(list []F) []T {
} }
} }
func Map[K comparable, F any, T any](params map[K]F, convert func(F) T) map[K]T {
into := make(map[K]T)
for k, item := range params {
into[k] = convert(item)
}
return into
}
type ExternalAuthMeta struct { type ExternalAuthMeta struct {
Authenticated bool Authenticated bool
ValidateError string ValidateError string
+8
View File
@@ -6,6 +6,14 @@ import (
"golang.org/x/exp/constraints" "golang.org/x/exp/constraints"
) )
func Map[K comparable, F any, T any](params map[K]F, convert func(F) T) map[K]T {
into := make(map[K]T)
for k, item := range params {
into[k] = convert(item)
}
return into
}
// Subset returns true if all the keys of a are present // Subset returns true if all the keys of a are present
// in b and have the same values. // in b and have the same values.
// If the corresponding value of a[k] is the zero value in // If the corresponding value of a[k] is the zero value in
+46
View File
@@ -8,8 +8,11 @@ import (
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
"strings"
"time"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"tailscale.com/ipn/ipnstate"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
) )
@@ -152,6 +155,49 @@ regionLoop:
return derpMap, nil return derpMap, nil
} }
func ExtractPreferredDERPName(pingResult *ipnstate.PingResult, node *Node, derpMap *tailcfg.DERPMap) string {
// Sometimes the preferred DERP doesn't match the one we're actually
// connected with. Perhaps because the agent prefers a different DERP and
// we're using that server instead.
preferredDerpID := node.PreferredDERP
if pingResult.DERPRegionID != 0 {
preferredDerpID = pingResult.DERPRegionID
}
preferredDerp, ok := derpMap.Regions[preferredDerpID]
preferredDerpName := fmt.Sprintf("Unnamed %d", preferredDerpID)
if ok {
preferredDerpName = preferredDerp.RegionName
}
return preferredDerpName
}
// ExtractDERPLatency extracts a map of derp region names to their latencies
func ExtractDERPLatency(node *Node, derpMap *tailcfg.DERPMap) map[string]time.Duration {
latencyMs := make(map[string]time.Duration)
// Convert DERP region IDs to friendly names for display in the UI.
for rawRegion, latency := range node.DERPLatency {
regionParts := strings.SplitN(rawRegion, "-", 2)
regionID, err := strconv.Atoi(regionParts[0])
if err != nil {
continue
}
region, found := derpMap.Regions[regionID]
if !found {
// It's possible that a workspace agent is using an old DERPMap
// and reports regions that do not exist. If that's the case,
// report the region as unknown!
region = &tailcfg.DERPRegion{
RegionID: regionID,
RegionName: fmt.Sprintf("Unnamed %d", regionID),
}
}
latencyMs[region.RegionName] = time.Duration(latency * float64(time.Second))
}
return latencyMs
}
// CompareDERPMaps returns true if the given DERPMaps are equivalent. Ordering // CompareDERPMaps returns true if the given DERPMaps are equivalent. Ordering
// of slices is ignored. // of slices is ignored.
// //
+109
View File
@@ -10,6 +10,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"tailscale.com/ipn/ipnstate"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet"
@@ -162,3 +163,111 @@ func TestNewDERPMap(t *testing.T) {
require.ErrorContains(t, err, "DERP map has no DERP nodes") require.ErrorContains(t, err, "DERP map has no DERP nodes")
}) })
} }
func TestExtractDERPLatency(t *testing.T) {
t.Parallel()
derpMap := &tailcfg.DERPMap{
Regions: map[int]*tailcfg.DERPRegion{
1: {
RegionID: 1,
RegionName: "Region One",
Nodes: []*tailcfg.DERPNode{
{Name: "node1", RegionID: 1},
},
},
2: {
RegionID: 2,
RegionName: "Region Two",
Nodes: []*tailcfg.DERPNode{
{Name: "node2", RegionID: 2},
},
},
},
}
t.Run("Basic", func(t *testing.T) {
t.Parallel()
node := &tailnet.Node{
DERPLatency: map[string]float64{
"1-node1": 0.05,
"2-node2": 0.1,
},
}
latencyMs := tailnet.ExtractDERPLatency(node, derpMap)
require.EqualValues(t, 50, latencyMs["Region One"].Milliseconds())
require.EqualValues(t, 100, latencyMs["Region Two"].Milliseconds())
require.Len(t, latencyMs, 2)
})
t.Run("UnknownRegion", func(t *testing.T) {
t.Parallel()
node := &tailnet.Node{
DERPLatency: map[string]float64{
"999-node999": 0.2,
},
}
latencyMs := tailnet.ExtractDERPLatency(node, derpMap)
require.EqualValues(t, 200, latencyMs["Unnamed 999"].Milliseconds())
require.Len(t, latencyMs, 1)
})
t.Run("InvalidRegionFormat", func(t *testing.T) {
t.Parallel()
node := &tailnet.Node{
DERPLatency: map[string]float64{
"invalid": 0.3,
"1-node1": 0.05,
"abc-node": 0.15,
},
}
latencyMs := tailnet.ExtractDERPLatency(node, derpMap)
require.EqualValues(t, 50, latencyMs["Region One"].Milliseconds())
require.Len(t, latencyMs, 1)
require.NotContains(t, latencyMs, "invalid")
require.NotContains(t, latencyMs, "abc-node")
})
t.Run("EmptyInput", func(t *testing.T) {
t.Parallel()
node := &tailnet.Node{
DERPLatency: map[string]float64{},
}
latencyMs := tailnet.ExtractDERPLatency(node, derpMap)
require.Empty(t, latencyMs)
})
}
func TestExtractPreferredDERPName(t *testing.T) {
t.Parallel()
derpMap := &tailcfg.DERPMap{
Regions: map[int]*tailcfg.DERPRegion{
1: {RegionName: "New York"},
2: {RegionName: "London"},
},
}
t.Run("UsesPingRegion", func(t *testing.T) {
t.Parallel()
pingResult := &ipnstate.PingResult{DERPRegionID: 2}
node := &tailnet.Node{PreferredDERP: 1}
result := tailnet.ExtractPreferredDERPName(pingResult, node, derpMap)
require.Equal(t, "London", result)
})
t.Run("UsesNodePreferred", func(t *testing.T) {
t.Parallel()
pingResult := &ipnstate.PingResult{DERPRegionID: 0}
node := &tailnet.Node{PreferredDERP: 1}
result := tailnet.ExtractPreferredDERPName(pingResult, node, derpMap)
require.Equal(t, "New York", result)
})
t.Run("UnknownRegion", func(t *testing.T) {
t.Parallel()
pingResult := &ipnstate.PingResult{DERPRegionID: 99}
node := &tailnet.Node{PreferredDERP: 1}
result := tailnet.ExtractPreferredDERPName(pingResult, node, derpMap)
require.Equal(t, "Unnamed 99", result)
})
}
+10
View File
@@ -5,11 +5,14 @@ import (
"net/http" "net/http"
"net/netip" "net/netip"
"net/url" "net/url"
"time"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"tailscale.com/ipn/ipnstate"
"tailscale.com/net/dns" "tailscale.com/net/dns"
"tailscale.com/net/netmon" "tailscale.com/net/netmon"
"tailscale.com/tailcfg"
"tailscale.com/wgengine/router" "tailscale.com/wgengine/router"
"github.com/google/uuid" "github.com/google/uuid"
@@ -27,6 +30,9 @@ import (
type Conn interface { type Conn interface {
CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error) CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error)
GetPeerDiagnostics(peerID uuid.UUID) tailnet.PeerDiagnostics GetPeerDiagnostics(peerID uuid.UUID) tailnet.PeerDiagnostics
Ping(ctx context.Context, agentID uuid.UUID) (time.Duration, bool, *ipnstate.PingResult, error)
Node() *tailnet.Node
DERPMap() *tailcfg.DERPMap
Close() error Close() error
} }
@@ -38,6 +44,10 @@ type vpnConn struct {
updatesCtrl *tailnet.TunnelAllWorkspaceUpdatesController updatesCtrl *tailnet.TunnelAllWorkspaceUpdatesController
} }
func (c *vpnConn) Ping(ctx context.Context, agentID uuid.UUID) (time.Duration, bool, *ipnstate.PingResult, error) {
return c.Conn.Ping(ctx, tailnet.TailscaleServicePrefix.AddrFromUUID(agentID))
}
func (c *vpnConn) CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error) { func (c *vpnConn) CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error) {
return c.updatesCtrl.CurrentState() return c.updatesCtrl.CurrentState()
} }
+2 -7
View File
@@ -23,6 +23,8 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, testutil.GoleakOptions...) goleak.VerifyTestMain(m, testutil.GoleakOptions...)
} }
const expectedHandshake = "codervpn tunnel 1.2\n"
// TestSpeaker_RawPeer tests the speaker with a peer that we simulate by directly making reads and // TestSpeaker_RawPeer tests the speaker with a peer that we simulate by directly making reads and
// writes to the other end of the pipe. There should be at least one test that does this, rather // writes to the other end of the pipe. There should be at least one test that does this, rather
// than use 2 speakers so that we don't have a bug where we don't adhere to the stated protocol, but // than use 2 speakers so that we don't have a bug where we don't adhere to the stated protocol, but
@@ -48,8 +50,6 @@ func TestSpeaker_RawPeer(t *testing.T) {
errCh <- err errCh <- err
}() }()
expectedHandshake := "codervpn tunnel 1.1\n"
b := make([]byte, 256) b := make([]byte, 256)
n, err := mp.Read(b) n, err := mp.Read(b)
require.NoError(t, err) require.NoError(t, err)
@@ -157,8 +157,6 @@ func TestSpeaker_OversizeHandshake(t *testing.T) {
errCh <- err errCh <- err
}() }()
expectedHandshake := "codervpn tunnel 1.1\n"
b := make([]byte, 256) b := make([]byte, 256)
n, err := mp.Read(b) n, err := mp.Read(b)
require.NoError(t, err) require.NoError(t, err)
@@ -210,7 +208,6 @@ func TestSpeaker_HandshakeInvalid(t *testing.T) {
_, err = mp.Write([]byte(tc.handshake)) _, err = mp.Write([]byte(tc.handshake))
require.NoError(t, err) require.NoError(t, err)
expectedHandshake := "codervpn tunnel 1.1\n"
b := make([]byte, 256) b := make([]byte, 256)
n, err := mp.Read(b) n, err := mp.Read(b)
require.NoError(t, err) require.NoError(t, err)
@@ -248,8 +245,6 @@ func TestSpeaker_CorruptMessage(t *testing.T) {
errCh <- err errCh <- err
}() }()
expectedHandshake := "codervpn tunnel 1.1\n"
b := make([]byte, 256) b := make([]byte, 256)
n, err := mp.Read(b) n, err := mp.Read(b)
require.NoError(t, err) require.NoError(t, err)
+128 -11
View File
@@ -19,6 +19,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/tailscale/wireguard-go/tun" "github.com/tailscale/wireguard-go/tun"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"tailscale.com/net/dns" "tailscale.com/net/dns"
"tailscale.com/net/netmon" "tailscale.com/net/netmon"
@@ -32,9 +33,9 @@ import (
"github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet"
) )
// netStatusInterval is the interval at which the tunnel sends network status updates to the manager. // netStatusInterval is the interval at which the tunnel records latencies,
// This is currently only used to keep `last_handshake` up to date. // and sends network status updates to the manager.
const netStatusInterval = 10 * time.Second const netStatusInterval = 5 * time.Second
type Tunnel struct { type Tunnel struct {
speaker[*TunnelMessage, *ManagerMessage, ManagerMessage] speaker[*TunnelMessage, *ManagerMessage, ManagerMessage]
@@ -86,8 +87,9 @@ func NewTunnel(
ctx: uCtx, ctx: uCtx,
cancel: uCancel, cancel: uCancel,
netLoopDone: make(chan struct{}), netLoopDone: make(chan struct{}),
logger: logger,
uSendCh: s.sendCh, uSendCh: s.sendCh,
agents: map[uuid.UUID]tailnet.Agent{}, agents: map[uuid.UUID]agentWithPing{},
workspaces: map[uuid.UUID]tailnet.Workspace{}, workspaces: map[uuid.UUID]tailnet.Workspace{},
clock: quartz.NewReal(), clock: quartz.NewReal(),
}, },
@@ -344,10 +346,12 @@ type updater struct {
cancel context.CancelFunc cancel context.CancelFunc
netLoopDone chan struct{} netLoopDone chan struct{}
logger slog.Logger
mu sync.Mutex mu sync.Mutex
uSendCh chan<- *TunnelMessage uSendCh chan<- *TunnelMessage
// agents contains the agents that are currently connected to the tunnel. // agents contains the agents that are currently connected to the tunnel.
agents map[uuid.UUID]tailnet.Agent agents map[uuid.UUID]agentWithPing
// workspaces contains the workspaces to which agents are currently connected via the tunnel. // workspaces contains the workspaces to which agents are currently connected via the tunnel.
workspaces map[uuid.UUID]tailnet.Workspace workspaces map[uuid.UUID]tailnet.Workspace
conn Conn conn Conn
@@ -355,6 +359,26 @@ type updater struct {
clock quartz.Clock clock quartz.Clock
} }
type agentWithPing struct {
tailnet.Agent
// non-nil if a successful ping has been made
lastPing *lastPing
}
func (a *agentWithPing) Clone() *agentWithPing {
return &agentWithPing{
Agent: a.Agent.Clone(),
lastPing: a.lastPing,
}
}
type lastPing struct {
pingDur time.Duration
didP2p bool
preferredDerp string
preferredDerpLatency *time.Duration
}
// Update pushes a workspace update to the manager // Update pushes a workspace update to the manager
func (u *updater) Update(update tailnet.WorkspaceUpdate) error { func (u *updater) Update(update tailnet.WorkspaceUpdate) error {
u.mu.Lock() u.mu.Lock()
@@ -412,10 +436,21 @@ func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUp
DeletedAgents: make([]*Agent, len(update.DeletedAgents)), DeletedAgents: make([]*Agent, len(update.DeletedAgents)),
} }
var upsertedAgentsWithPing []*agentWithPing
// save the workspace update to the tunnel's state, such that it can // save the workspace update to the tunnel's state, such that it can
// be used to populate automated peer updates. // be used to populate automated peer updates.
for _, agent := range update.UpsertedAgents { for _, agent := range update.UpsertedAgents {
u.agents[agent.ID] = agent.Clone() var lastPing *lastPing
if existing, ok := u.agents[agent.ID]; ok {
lastPing = existing.lastPing
}
upsertedAgent := agentWithPing{
Agent: agent.Clone(),
lastPing: lastPing,
}
u.agents[agent.ID] = upsertedAgent
upsertedAgentsWithPing = append(upsertedAgentsWithPing, &upsertedAgent)
} }
for _, agent := range update.DeletedAgents { for _, agent := range update.DeletedAgents {
delete(u.agents, agent.ID) delete(u.agents, agent.ID)
@@ -435,7 +470,7 @@ func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUp
} }
} }
upsertedAgents := u.convertAgentsLocked(update.UpsertedAgents) upsertedAgents := u.convertAgentsLocked(upsertedAgentsWithPing)
out.UpsertedAgents = upsertedAgents out.UpsertedAgents = upsertedAgents
for i, ws := range update.DeletedWorkspaces { for i, ws := range update.DeletedWorkspaces {
out.DeletedWorkspaces[i] = &Workspace{ out.DeletedWorkspaces[i] = &Workspace{
@@ -466,7 +501,7 @@ func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUp
// convertAgentsLocked takes a list of `tailnet.Agent` and converts them to proto agents. // convertAgentsLocked takes a list of `tailnet.Agent` and converts them to proto agents.
// If there is an active connection, the last handshake time is populated. // If there is an active connection, the last handshake time is populated.
func (u *updater) convertAgentsLocked(agents []*tailnet.Agent) []*Agent { func (u *updater) convertAgentsLocked(agents []*agentWithPing) []*Agent {
out := make([]*Agent, 0, len(agents)) out := make([]*Agent, 0, len(agents))
for _, agent := range agents { for _, agent := range agents {
@@ -477,12 +512,26 @@ func (u *updater) convertAgentsLocked(agents []*tailnet.Agent) []*Agent {
sort.Slice(fqdn, func(i, j int) bool { sort.Slice(fqdn, func(i, j int) bool {
return len(fqdn[i]) < len(fqdn[j]) return len(fqdn[i]) < len(fqdn[j])
}) })
var lastPing *LastPing
if agent.lastPing != nil {
var preferredDerpLatency *durationpb.Duration
if agent.lastPing.preferredDerpLatency != nil {
preferredDerpLatency = durationpb.New(*agent.lastPing.preferredDerpLatency)
}
lastPing = &LastPing{
Latency: durationpb.New(agent.lastPing.pingDur),
DidP2P: agent.lastPing.didP2p,
PreferredDerp: agent.lastPing.preferredDerp,
PreferredDerpLatency: preferredDerpLatency,
}
}
protoAgent := &Agent{ protoAgent := &Agent{
Id: tailnet.UUIDToByteSlice(agent.ID), Id: tailnet.UUIDToByteSlice(agent.ID),
Name: agent.Name, Name: agent.Name,
WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID), WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID),
Fqdn: fqdn, Fqdn: fqdn,
IpAddrs: hostsToIPStrings(agent.Hosts), IpAddrs: hostsToIPStrings(agent.Hosts),
LastPing: lastPing,
} }
if u.conn != nil { if u.conn != nil {
diags := u.conn.GetPeerDiagnostics(agent.ID) diags := u.conn.GetPeerDiagnostics(agent.ID)
@@ -514,8 +563,8 @@ func (u *updater) stop() error {
return nil return nil
} }
err := u.conn.Close() err := u.conn.Close()
u.conn = nil
u.cancel() u.cancel()
u.conn = nil
return err return err
} }
@@ -525,7 +574,7 @@ func (u *updater) sendAgentUpdate() {
u.mu.Lock() u.mu.Lock()
defer u.mu.Unlock() defer u.mu.Unlock()
agents := make([]*tailnet.Agent, 0, len(u.agents)) agents := make([]*agentWithPing, 0, len(u.agents))
for _, agent := range u.agents { for _, agent := range u.agents {
agents = append(agents, &agent) agents = append(agents, &agent)
} }
@@ -558,17 +607,85 @@ func (u *updater) netStatusLoop() {
case <-u.ctx.Done(): case <-u.ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
u.recordLatencies()
u.sendAgentUpdate() u.sendAgentUpdate()
} }
} }
} }
func (u *updater) recordLatencies() {
var agentsIDsToPing []uuid.UUID
u.mu.Lock()
for _, agent := range u.agents {
agentsIDsToPing = append(agentsIDsToPing, agent.ID)
}
conn := u.conn
u.mu.Unlock()
if conn == nil {
u.logger.Debug(u.ctx, "skipping pings as tunnel is not connected")
return
}
go func() {
// We need a waitgroup to cancel the context after all pings are done.
var wg sync.WaitGroup
pingCtx, cancelFunc := context.WithTimeout(u.ctx, netStatusInterval)
defer cancelFunc()
for _, agentID := range agentsIDsToPing {
wg.Add(1)
go func() {
defer wg.Done()
pingDur, didP2p, pingResult, err := conn.Ping(pingCtx, agentID)
if err != nil {
u.logger.Warn(u.ctx, "failed to ping agent", slog.F("agent_id", agentID), slog.Error(err))
return
}
// We fetch the Node and DERPMap after each ping, as it may have
// changed.
node := conn.Node()
derpMap := conn.DERPMap()
if node == nil || derpMap == nil {
u.logger.Warn(u.ctx, "failed to get DERP map or node after ping")
return
}
derpLatencies := tailnet.ExtractDERPLatency(node, derpMap)
preferredDerp := tailnet.ExtractPreferredDERPName(pingResult, node, derpMap)
var preferredDerpLatency *time.Duration
if derpLatency, ok := derpLatencies[preferredDerp]; ok {
preferredDerpLatency = &derpLatency
} else {
u.logger.Debug(u.ctx, "preferred DERP not found in DERP latency map", slog.F("preferred_derp", preferredDerp))
}
// Write back results
u.mu.Lock()
defer u.mu.Unlock()
if agent, ok := u.agents[agentID]; ok {
agent.lastPing = &lastPing{
pingDur: pingDur,
didP2p: didP2p,
preferredDerp: preferredDerp,
preferredDerpLatency: preferredDerpLatency,
}
u.agents[agentID] = agent
} else {
u.logger.Debug(u.ctx, "ignoring ping result for unknown agent", slog.F("agent_id", agentID))
}
}()
}
wg.Wait()
}()
}
// processSnapshotUpdate handles the logic when a full state update is received. // processSnapshotUpdate handles the logic when a full state update is received.
// When the tunnel is live, we only receive diffs, but the first packet on any given // When the tunnel is live, we only receive diffs, but the first packet on any given
// reconnect to the tailnet API is a full state. // reconnect to the tailnet API is a full state.
// Without this logic we weren't processing deletes for any workspaces or agents deleted // Without this logic we weren't processing deletes for any workspaces or agents deleted
// while the client was disconnected while the computer was asleep. // while the client was disconnected while the computer was asleep.
func processSnapshotUpdate(update *tailnet.WorkspaceUpdate, agents map[uuid.UUID]tailnet.Agent, workspaces map[uuid.UUID]tailnet.Workspace) { func processSnapshotUpdate(update *tailnet.WorkspaceUpdate, agents map[uuid.UUID]agentWithPing, workspaces map[uuid.UUID]tailnet.Workspace) {
// ignoredWorkspaces is initially populated with the workspaces that are // ignoredWorkspaces is initially populated with the workspaces that are
// in the current update. Later on we populate it with the deleted workspaces too // in the current update. Later on we populate it with the deleted workspaces too
// so that we don't send duplicate updates. Same applies to ignoredAgents. // so that we don't send duplicate updates. Same applies to ignoredAgents.
+263 -6
View File
@@ -15,10 +15,13 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"tailscale.com/ipn/ipnstate"
"tailscale.com/tailcfg"
"tailscale.com/util/dnsname" "tailscale.com/util/dnsname"
"github.com/coder/quartz" "github.com/coder/quartz"
maputil "github.com/coder/coder/v2/coderd/util/maps"
"github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto" "github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/coder/v2/testutil" "github.com/coder/coder/v2/testutil"
@@ -57,15 +60,59 @@ func newFakeConn(state tailnet.WorkspaceUpdate, hsTime time.Time) *fakeConn {
} }
} }
func (f *fakeConn) withManualPings() *fakeConn {
f.returnPing = make(chan struct{})
return f
}
type fakeConn struct { type fakeConn struct {
state tailnet.WorkspaceUpdate state tailnet.WorkspaceUpdate
returnPing chan struct{}
hsTime time.Time hsTime time.Time
closed chan struct{} closed chan struct{}
doClose sync.Once doClose sync.Once
} }
func (*fakeConn) DERPMap() *tailcfg.DERPMap {
return &tailcfg.DERPMap{
Regions: map[int]*tailcfg.DERPRegion{
999: {
RegionID: 999,
RegionCode: "zzz",
RegionName: "Coder Region",
},
},
}
}
func (*fakeConn) Node() *tailnet.Node {
return &tailnet.Node{
PreferredDERP: 999,
DERPLatency: map[string]float64{
"999": 0.1,
},
}
}
var _ Conn = (*fakeConn)(nil) var _ Conn = (*fakeConn)(nil)
func (f *fakeConn) Ping(ctx context.Context, agentID uuid.UUID) (time.Duration, bool, *ipnstate.PingResult, error) {
if f.returnPing == nil {
return time.Millisecond * 100, true, &ipnstate.PingResult{
DERPRegionID: 999,
}, nil
}
select {
case <-ctx.Done():
return 0, false, nil, ctx.Err()
case <-f.returnPing:
return time.Millisecond * 100, true, &ipnstate.PingResult{
DERPRegionID: 999,
}, nil
}
}
func (f *fakeConn) CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error) { func (f *fakeConn) CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error) {
return f.state, nil return f.state, nil
} }
@@ -292,7 +339,7 @@ func TestUpdater_createPeerUpdate(t *testing.T) {
updater := updater{ updater := updater{
ctx: ctx, ctx: ctx,
netLoopDone: make(chan struct{}), netLoopDone: make(chan struct{}),
agents: map[uuid.UUID]tailnet.Agent{}, agents: map[uuid.UUID]agentWithPing{},
workspaces: map[uuid.UUID]tailnet.Workspace{}, workspaces: map[uuid.UUID]tailnet.Workspace{},
conn: newFakeConn(tailnet.WorkspaceUpdate{}, hsTime), conn: newFakeConn(tailnet.WorkspaceUpdate{}, hsTime),
} }
@@ -430,6 +477,22 @@ func TestTunnel_sendAgentUpdate(t *testing.T) {
require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[0].LastHandshake.AsTime()) require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[0].LastHandshake.AsTime())
} }
// Latency is gathered in the background, so it'll eventually be sent
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
if len(req.msg.GetPeerUpdate().UpsertedAgents) == 0 {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing == nil {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.Latency.AsDuration().Milliseconds() != 100 {
return false
}
return req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.PreferredDerp == "Coder Region"
}, testutil.IntervalFast)
// Upsert a new agent // Upsert a new agent
err = tun.Update(tailnet.WorkspaceUpdate{ err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{}, UpsertedWorkspaces: []*tailnet.Workspace{},
@@ -459,6 +522,10 @@ func TestTunnel_sendAgentUpdate(t *testing.T) {
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id) require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[0].LastHandshake.AsTime()) require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[0].LastHandshake.AsTime())
// The latency of the first agent is still set
require.NotNil(t, req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing)
require.EqualValues(t, 100, req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.Latency.AsDuration().Milliseconds())
require.Equal(t, aID2[:], req.msg.GetPeerUpdate().UpsertedAgents[1].Id) require.Equal(t, aID2[:], req.msg.GetPeerUpdate().UpsertedAgents[1].Id)
require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[1].LastHandshake.AsTime()) require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[1].LastHandshake.AsTime())
@@ -486,6 +553,22 @@ func TestTunnel_sendAgentUpdate(t *testing.T) {
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1) require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
require.Equal(t, aID2[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id) require.Equal(t, aID2[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[0].LastHandshake.AsTime()) require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[0].LastHandshake.AsTime())
// Eventually the second agent's latency is set
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
if len(req.msg.GetPeerUpdate().UpsertedAgents) == 0 {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing == nil {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.Latency.AsDuration().Milliseconds() != 100 {
return false
}
return req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.PreferredDerp == "Coder Region"
}, testutil.IntervalFast)
} }
func TestTunnel_sendAgentUpdateReconnect(t *testing.T) { func TestTunnel_sendAgentUpdateReconnect(t *testing.T) {
@@ -693,6 +776,178 @@ func TestTunnel_sendAgentUpdateWorkspaceReconnect(t *testing.T) {
require.Equal(t, wID1[:], peerUpdate.DeletedWorkspaces[0].Id) require.Equal(t, wID1[:], peerUpdate.DeletedWorkspaces[0].Id)
} }
func TestTunnel_slowPing(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
mClock := quartz.NewMock(t)
wID1 := uuid.UUID{1}
aID1 := uuid.UUID{2}
hsTime := time.Now().Add(-time.Minute).UTC()
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{}, hsTime).withManualPings()
tun, mgr := setupTunnel(t, ctx, client, mClock)
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSend(ctx, t, client.ch, conn)
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
// Inform the tunnel of the initial state
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wID1, Name: "w1", Status: proto.Workspace_STARTING,
},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: aID1,
Name: "agent1",
WorkspaceID: wID1,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
})
require.NoError(t, err)
req := testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
// We can't check that it *never* pings, so the best we can do is
// check it doesn't ping even with 5 goroutines attempting to,
// and that updates are received as normal
for range 5 {
mClock.AdvanceNext()
require.Nil(t, req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing)
}
// Provided that it hasn't been 5 seconds since the last AdvanceNext call,
// there'll be a ping in-flight that will return with this message
testutil.RequireSend(ctx, t, conn.returnPing, struct{}{})
// Which will mean we'll eventually receive a PeerUpdate with the ping
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
if len(req.msg.GetPeerUpdate().UpsertedAgents) == 0 {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing == nil {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.Latency.AsDuration().Milliseconds() != 100 {
return false
}
return req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.PreferredDerp == "Coder Region"
}, testutil.IntervalFast)
}
func TestTunnel_stopMidPing(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
mClock := quartz.NewMock(t)
wID1 := uuid.UUID{1}
aID1 := uuid.UUID{2}
hsTime := time.Now().Add(-time.Minute).UTC()
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{}, hsTime).withManualPings()
tun, mgr := setupTunnel(t, ctx, client, mClock)
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSend(ctx, t, client.ch, conn)
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
// Inform the tunnel of the initial state
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wID1, Name: "w1", Status: proto.Workspace_STARTING,
},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: aID1,
Name: "agent1",
WorkspaceID: wID1,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
})
require.NoError(t, err)
req := testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
// We'll have some pings in flight when we stop
for range 5 {
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing)
}
// Stop the tunnel
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Stop{},
})
resp = r
errCh <- err
}()
testutil.TryReceive(ctx, t, conn.closed)
err = testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok = resp.Msg.(*TunnelMessage_Stop)
require.True(t, ok)
}
//nolint:revive // t takes precedence //nolint:revive // t takes precedence
func setupTunnel(t *testing.T, ctx context.Context, client *fakeClient, mClock *quartz.Mock) (*Tunnel, *speaker[*ManagerMessage, *TunnelMessage, TunnelMessage]) { func setupTunnel(t *testing.T, ctx context.Context, client *fakeClient, mClock *quartz.Mock) (*Tunnel, *speaker[*ManagerMessage, *TunnelMessage, TunnelMessage]) {
mp, tp := net.Pipe() mp, tp := net.Pipe()
@@ -902,11 +1157,13 @@ func TestProcessFreshState(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
t.Parallel() t.Parallel()
agentsCopy := make(map[uuid.UUID]tailnet.Agent) agentsCopy := maputil.Map(tt.initialAgents, func(a tailnet.Agent) agentWithPing {
maps.Copy(agentsCopy, tt.initialAgents) return agentWithPing{
Agent: a.Clone(),
workspaceCopy := make(map[uuid.UUID]tailnet.Workspace) lastPing: nil,
maps.Copy(workspaceCopy, tt.initialWorkspaces) }
})
workspaceCopy := maps.Clone(tt.initialWorkspaces)
processSnapshotUpdate(tt.update, agentsCopy, workspaceCopy) processSnapshotUpdate(tt.update, agentsCopy, workspaceCopy)
+8 -1
View File
@@ -16,7 +16,14 @@ var CurrentSupportedVersions = RPCVersionList{
// - device_id: Coder Desktop device ID // - device_id: Coder Desktop device ID
// - device_os: Coder Desktop OS information // - device_os: Coder Desktop OS information
// - coder_desktop_version: Coder Desktop version // - coder_desktop_version: Coder Desktop version
{Major: 1, Minor: 1}, // 1.2 adds network related information to Agent:
// - last_ping:
// - latency: RTT of the most recently sent ping
// - did_p2p: Whether the last ping was sent over P2P
// - preferred_derp: The server that DERP relayed connections are
// using, if they're not using P2P.
// - preferred_derp_latency: The latency to the preferred DERP
{Major: 1, Minor: 2},
}, },
} }
+371 -244
View File
File diff suppressed because it is too large Load Diff
+16
View File
@@ -3,6 +3,7 @@ option go_package = "github.com/coder/coder/v2/vpn";
option csharp_namespace = "Coder.Desktop.Vpn.Proto"; option csharp_namespace = "Coder.Desktop.Vpn.Proto";
import "google/protobuf/timestamp.proto"; import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
package vpn; package vpn;
@@ -130,6 +131,21 @@ message Agent {
// last_handshake is the primary indicator of whether we are connected to a peer. Zero value or // last_handshake is the primary indicator of whether we are connected to a peer. Zero value or
// anything longer than 5 minutes ago means there is a problem. // anything longer than 5 minutes ago means there is a problem.
google.protobuf.Timestamp last_handshake = 6; google.protobuf.Timestamp last_handshake = 6;
// If unset, a successful ping has not yet been made.
optional LastPing last_ping = 7;
}
message LastPing {
// latency is the RTT of the ping to the agent.
google.protobuf.Duration latency = 1;
// did_p2p indicates whether the ping was sent P2P, or over DERP.
bool did_p2p = 2;
// preferred_derp is the human readable name of the preferred DERP region,
// or the region used for the last ping, if it was sent over DERP.
string preferred_derp = 3;
// preferred_derp_latency is the last known latency to the preferred DERP
// region. Unset if the region does not appear in the DERP map.
optional google.protobuf.Duration preferred_derp_latency = 4;
} }
// NetworkSettingsRequest is based on // NetworkSettingsRequest is based on