chore: add DRPC server implementation for network telemetry (#13675)

This commit is contained in:
Dean Sheather
2024-07-02 01:50:52 +10:00
committed by GitHub
parent 2fde054e10
commit 6c94dd4f23
14 changed files with 1192 additions and 557 deletions
+6 -6
View File
@@ -22,7 +22,6 @@ import (
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/externalauth"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/codersdk"
@@ -60,11 +59,11 @@ type Options struct {
Pubsub pubsub.Pubsub
DerpMapFn func() *tailcfg.DERPMap
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
StatsReporter *workspacestats.Reporter
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent)
AccessURL *url.URL
AppHostname string
@@ -154,10 +153,11 @@ func New(opts Options) *API {
}
api.DRPCService = &tailnet.DRPCService{
CoordPtr: opts.TailnetCoordinator,
Logger: opts.Log,
DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency,
DerpMapFn: opts.DerpMapFn,
CoordPtr: opts.TailnetCoordinator,
Logger: opts.Log,
DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency,
DerpMapFn: opts.DerpMapFn,
NetworkTelemetryHandler: opts.NetworkTelemetryHandler,
}
return api
+37 -14
View File
@@ -39,6 +39,7 @@ import (
"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/clock"
_ "github.com/coder/coder/v2/coderd/apidoc" // Used for swagger docs.
"github.com/coder/coder/v2/coderd/appearance"
"github.com/coder/coder/v2/coderd/audit"
@@ -142,14 +143,16 @@ type Options struct {
DERPServer *derp.Server
// BaseDERPMap is used as the base DERP map for all clients and agents.
// Proxies are added to this list.
BaseDERPMap *tailcfg.DERPMap
DERPMapUpdateFrequency time.Duration
SwaggerEndpoint bool
SetUserGroups func(ctx context.Context, logger slog.Logger, tx database.Store, userID uuid.UUID, orgGroupNames map[uuid.UUID][]string, createMissingGroups bool) error
SetUserSiteRoles func(ctx context.Context, logger slog.Logger, tx database.Store, userID uuid.UUID, roles []string) error
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
UserQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
AccessControlStore *atomic.Pointer[dbauthz.AccessControlStore]
BaseDERPMap *tailcfg.DERPMap
DERPMapUpdateFrequency time.Duration
NetworkTelemetryBatchFrequency time.Duration
NetworkTelemetryBatchMaxSize int
SwaggerEndpoint bool
SetUserGroups func(ctx context.Context, logger slog.Logger, tx database.Store, userID uuid.UUID, orgGroupNames map[uuid.UUID][]string, createMissingGroups bool) error
SetUserSiteRoles func(ctx context.Context, logger slog.Logger, tx database.Store, userID uuid.UUID, roles []string) error
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
UserQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
AccessControlStore *atomic.Pointer[dbauthz.AccessControlStore]
// AppSecurityKey is the crypto key used to sign and encrypt tokens related to
// workspace applications. It consists of both a signing and encryption key.
AppSecurityKey workspaceapps.SecurityKey
@@ -305,6 +308,12 @@ func New(options *Options) *API {
if options.DERPMapUpdateFrequency == 0 {
options.DERPMapUpdateFrequency = 5 * time.Second
}
if options.NetworkTelemetryBatchFrequency == 0 {
options.NetworkTelemetryBatchFrequency = 1 * time.Minute
}
if options.NetworkTelemetryBatchMaxSize == 0 {
options.NetworkTelemetryBatchMaxSize = 1_000
}
if options.TailnetCoordinator == nil {
options.TailnetCoordinator = tailnet.NewCoordinator(options.Logger)
}
@@ -539,12 +548,19 @@ func New(options *Options) *API {
if options.DeploymentValues.Prometheus.Enable {
options.PrometheusRegistry.MustRegister(stn)
}
api.TailnetClientService, err = tailnet.NewClientService(
api.Logger.Named("tailnetclient"),
&api.TailnetCoordinator,
api.Options.DERPMapUpdateFrequency,
api.DERPMap,
api.NetworkTelemetryBatcher = tailnet.NewNetworkTelemetryBatcher(
clock.NewReal(),
api.Options.NetworkTelemetryBatchFrequency,
api.Options.NetworkTelemetryBatchMaxSize,
api.handleNetworkTelemetry,
)
api.TailnetClientService, err = tailnet.NewClientService(tailnet.ClientServiceOptions{
Logger: api.Logger.Named("tailnetclient"),
CoordPtr: &api.TailnetCoordinator,
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
DERPMapFn: api.DERPMap,
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
})
if err != nil {
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))
}
@@ -1255,6 +1271,7 @@ type API struct {
Auditor atomic.Pointer[audit.Auditor]
WorkspaceClientCoordinateOverride atomic.Pointer[func(rw http.ResponseWriter) bool]
TailnetCoordinator atomic.Pointer[tailnet.Coordinator]
NetworkTelemetryBatcher *tailnet.NetworkTelemetryBatcher
TailnetClientService *tailnet.ClientService
QuotaCommitter atomic.Pointer[proto.QuotaCommitter]
AppearanceFetcher atomic.Pointer[appearance.Fetcher]
@@ -1313,7 +1330,12 @@ type API struct {
// Close waits for all WebSocket connections to drain before returning.
func (api *API) Close() error {
api.cancel()
select {
case <-api.ctx.Done():
return xerrors.New("API already closed")
default:
api.cancel()
}
if api.derpCloseFunc != nil {
api.derpCloseFunc()
}
@@ -1348,6 +1370,7 @@ func (api *API) Close() error {
}
_ = api.agentProvider.Close()
_ = api.statsReporter.Close()
_ = api.NetworkTelemetryBatcher.Close()
return nil
}
+305
View File
@@ -20,6 +20,8 @@ import (
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
"cdr.dev/slog"
"github.com/coder/coder/v2/buildinfo"
@@ -27,6 +29,7 @@ import (
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk"
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
)
const (
@@ -795,6 +798,7 @@ type Snapshot struct {
WorkspaceResourceMetadata []WorkspaceResourceMetadata `json:"workspace_resource_metadata"`
WorkspaceResources []WorkspaceResource `json:"workspace_resources"`
Workspaces []Workspace `json:"workspaces"`
NetworkEvents []NetworkEvent `json:"network_events"`
}
// Deployment contains information about the host running Coder.
@@ -1006,6 +1010,307 @@ type ExternalProvisioner struct {
ShutdownAt *time.Time `json:"shutdown_at"`
}
type NetworkEventIPFields struct {
Version int32 `json:"version"` // 4 or 6
Class string `json:"class"` // public, private, link_local, unique_local, loopback
}
func ipFieldsFromProto(proto *tailnetproto.IPFields) NetworkEventIPFields {
if proto == nil {
return NetworkEventIPFields{}
}
return NetworkEventIPFields{
Version: proto.Version,
Class: strings.ToLower(proto.Class.String()),
}
}
type NetworkEventP2PEndpoint struct {
Hash string `json:"hash"`
Port int `json:"port"`
Fields NetworkEventIPFields `json:"fields"`
}
func p2pEndpointFromProto(proto *tailnetproto.TelemetryEvent_P2PEndpoint) NetworkEventP2PEndpoint {
if proto == nil {
return NetworkEventP2PEndpoint{}
}
return NetworkEventP2PEndpoint{
Hash: proto.Hash,
Port: int(proto.Port),
Fields: ipFieldsFromProto(proto.Fields),
}
}
type DERPMapHomeParams struct {
RegionScore map[int64]float64 `json:"region_score"`
}
func derpMapHomeParamsFromProto(proto *tailnetproto.DERPMap_HomeParams) DERPMapHomeParams {
if proto == nil {
return DERPMapHomeParams{}
}
out := DERPMapHomeParams{
RegionScore: make(map[int64]float64, len(proto.RegionScore)),
}
for k, v := range proto.RegionScore {
out.RegionScore[k] = v
}
return out
}
type DERPRegion struct {
RegionID int64 `json:"region_id"`
EmbeddedRelay bool `json:"embedded_relay"`
RegionCode string
RegionName string
Avoid bool
Nodes []DERPNode `json:"nodes"`
}
func derpRegionFromProto(proto *tailnetproto.DERPMap_Region) DERPRegion {
if proto == nil {
return DERPRegion{}
}
nodes := make([]DERPNode, 0, len(proto.Nodes))
for _, node := range proto.Nodes {
nodes = append(nodes, derpNodeFromProto(node))
}
return DERPRegion{
RegionID: proto.RegionId,
EmbeddedRelay: proto.EmbeddedRelay,
RegionCode: proto.RegionCode,
RegionName: proto.RegionName,
Avoid: proto.Avoid,
Nodes: nodes,
}
}
type DERPNode struct {
Name string `json:"name"`
RegionID int64 `json:"region_id"`
HostName string `json:"host_name"`
CertName string `json:"cert_name"`
IPv4 string `json:"ipv4"`
IPv6 string `json:"ipv6"`
STUNPort int32 `json:"stun_port"`
STUNOnly bool `json:"stun_only"`
DERPPort int32 `json:"derp_port"`
InsecureForTests bool `json:"insecure_for_tests"`
ForceHTTP bool `json:"force_http"`
STUNTestIP string `json:"stun_test_ip"`
CanPort80 bool `json:"can_port_80"`
}
func derpNodeFromProto(proto *tailnetproto.DERPMap_Region_Node) DERPNode {
if proto == nil {
return DERPNode{}
}
return DERPNode{
Name: proto.Name,
RegionID: proto.RegionId,
HostName: proto.HostName,
CertName: proto.CertName,
IPv4: proto.Ipv4,
IPv6: proto.Ipv6,
STUNPort: proto.StunPort,
STUNOnly: proto.StunOnly,
DERPPort: proto.DerpPort,
InsecureForTests: proto.InsecureForTests,
ForceHTTP: proto.ForceHttp,
STUNTestIP: proto.StunTestIp,
CanPort80: proto.CanPort_80,
}
}
type DERPMap struct {
HomeParams DERPMapHomeParams `json:"home_params"`
Regions map[int64]DERPRegion
}
func derpMapFromProto(proto *tailnetproto.DERPMap) DERPMap {
if proto == nil {
return DERPMap{}
}
regionMap := make(map[int64]DERPRegion, len(proto.Regions))
for k, v := range proto.Regions {
regionMap[k] = derpRegionFromProto(v)
}
return DERPMap{
HomeParams: derpMapHomeParamsFromProto(proto.HomeParams),
Regions: regionMap,
}
}
type NetcheckIP struct {
Hash string `json:"hash"`
Fields NetworkEventIPFields `json:"fields"`
}
func netcheckIPFromProto(proto *tailnetproto.Netcheck_NetcheckIP) NetcheckIP {
if proto == nil {
return NetcheckIP{}
}
return NetcheckIP{
Hash: proto.Hash,
Fields: ipFieldsFromProto(proto.Fields),
}
}
type Netcheck struct {
UDP bool `json:"udp"`
IPv6 bool `json:"ipv6"`
IPv4 bool `json:"ipv4"`
IPv6CanSend bool `json:"ipv6_can_send"`
IPv4CanSend bool `json:"ipv4_can_send"`
OSHasIPv6 bool `json:"os_has_ipv6"`
ICMPv4 bool `json:"icmpv4"`
MappingVariesByDestIP *bool `json:"mapping_varies_by_dest_ip"`
HairPinning *bool `json:"hair_pinning"`
UPnP *bool `json:"upnp"`
PMP *bool `json:"pmp"`
PCP *bool `json:"pcp"`
PreferredDERP int64 `json:"preferred_derp"`
RegionLatency map[int64]time.Duration `json:"region_latency"`
RegionV4Latency map[int64]time.Duration `json:"region_v4_latency"`
RegionV6Latency map[int64]time.Duration `json:"region_v6_latency"`
GlobalV4 NetcheckIP `json:"global_v4"`
GlobalV6 NetcheckIP `json:"global_v6"`
CaptivePortal *bool `json:"captive_portal"`
}
func protoBool(b *wrapperspb.BoolValue) *bool {
if b == nil {
return nil
}
return &b.Value
}
func netcheckFromProto(proto *tailnetproto.Netcheck) Netcheck {
if proto == nil {
return Netcheck{}
}
durationMapFromProto := func(m map[int64]*durationpb.Duration) map[int64]time.Duration {
out := make(map[int64]time.Duration, len(m))
for k, v := range m {
out[k] = v.AsDuration()
}
return out
}
return Netcheck{
UDP: proto.UDP,
IPv6: proto.IPv6,
IPv4: proto.IPv4,
IPv6CanSend: proto.IPv6CanSend,
IPv4CanSend: proto.IPv4CanSend,
OSHasIPv6: proto.OSHasIPv6,
ICMPv4: proto.ICMPv4,
MappingVariesByDestIP: protoBool(proto.MappingVariesByDestIP),
HairPinning: protoBool(proto.HairPinning),
UPnP: protoBool(proto.UPnP),
PMP: protoBool(proto.PMP),
PCP: protoBool(proto.PCP),
PreferredDERP: proto.PreferredDERP,
RegionLatency: durationMapFromProto(proto.RegionLatency),
RegionV4Latency: durationMapFromProto(proto.RegionV4Latency),
RegionV6Latency: durationMapFromProto(proto.RegionV6Latency),
GlobalV4: netcheckIPFromProto(proto.GlobalV4),
GlobalV6: netcheckIPFromProto(proto.GlobalV6),
CaptivePortal: protoBool(proto.CaptivePortal),
}
}
// NetworkEvent and all related structs come from tailnet.proto.
type NetworkEvent struct {
ID uuid.UUID `json:"id"`
Time time.Time `json:"time"`
Application string `json:"application"`
Status string `json:"status"` // connected, disconnected
DisconnectionReason string `json:"disconnection_reason"`
ClientType string `json:"client_type"` // cli, agent, coderd, wsproxy
NodeIDSelf uint64 `json:"node_id_self"`
NodeIDRemote uint64 `json:"node_id_remote"`
P2PEndpoint NetworkEventP2PEndpoint `json:"p2p_endpoint"`
LogIPHashes map[string]NetworkEventIPFields `json:"log_ip_hashes"`
HomeDERP string `json:"home_derp"`
Logs []string `json:"logs"`
DERPMap DERPMap `json:"derp_map"`
LatestNetcheck Netcheck `json:"latest_netcheck"`
ConnectionAge *time.Duration `json:"connection_age"`
ConnectionSetup *time.Duration `json:"connection_setup"`
P2PSetup *time.Duration `json:"p2p_setup"`
DERPLatency *time.Duration `json:"derp_latency"`
P2PLatency *time.Duration `json:"p2p_latency"`
ThroughputMbits *float32 `json:"throughput_mbits"`
}
func protoFloat(f *wrapperspb.FloatValue) *float32 {
if f == nil {
return nil
}
return &f.Value
}
func protoDurationNil(d *durationpb.Duration) *time.Duration {
if d == nil {
return nil
}
dur := d.AsDuration()
return &dur
}
func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, error) {
if proto == nil {
return NetworkEvent{}, xerrors.New("nil event")
}
id, err := uuid.ParseBytes(proto.Id)
if err != nil {
return NetworkEvent{}, xerrors.Errorf("parse id %q: %w", proto.Id, err)
}
logIPHashes := make(map[string]NetworkEventIPFields, len(proto.LogIpHashes))
for k, v := range proto.LogIpHashes {
logIPHashes[k] = ipFieldsFromProto(v)
}
return NetworkEvent{
ID: id,
Time: proto.Time.AsTime(),
Application: proto.Application,
Status: strings.ToLower(proto.Status.String()),
DisconnectionReason: proto.DisconnectionReason,
ClientType: strings.ToLower(proto.ClientType.String()),
NodeIDSelf: proto.NodeIdSelf,
NodeIDRemote: proto.NodeIdRemote,
P2PEndpoint: p2pEndpointFromProto(proto.P2PEndpoint),
LogIPHashes: logIPHashes,
HomeDERP: proto.HomeDerp,
Logs: proto.Logs,
DERPMap: derpMapFromProto(proto.DerpMap),
LatestNetcheck: netcheckFromProto(proto.LatestNetcheck),
ConnectionAge: protoDurationNil(proto.ConnectionAge),
ConnectionSetup: protoDurationNil(proto.ConnectionSetup),
P2PSetup: protoDurationNil(proto.P2PSetup),
DERPLatency: protoDurationNil(proto.DerpLatency),
P2PLatency: protoDurationNil(proto.P2PLatency),
ThroughputMbits: protoFloat(proto.ThroughputMbits),
}, nil
}
type noopReporter struct{}
func (*noopReporter) Report(_ *Snapshot) {}
+19 -1
View File
@@ -24,9 +24,11 @@ import (
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/httpapi"
"github.com/coder/coder/v2/coderd/httpmw"
"github.com/coder/coder/v2/coderd/telemetry"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/tailnet"
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
)
// @Summary Workspace agent RPC API
@@ -130,11 +132,11 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
Pubsub: api.Pubsub,
DerpMapFn: api.DERPMap,
TailnetCoordinator: &api.TailnetCoordinator,
TemplateScheduleStore: api.TemplateScheduleStore,
AppearanceFetcher: &api.AppearanceFetcher,
StatsReporter: api.statsReporter,
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
PublishWorkspaceAgentLogsUpdateFn: api.publishWorkspaceAgentLogsUpdate,
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
AccessURL: api.AccessURL,
AppHostname: api.AppHostname,
@@ -165,6 +167,22 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
}
}
func (api *API) handleNetworkTelemetry(batch []*tailnetproto.TelemetryEvent) {
telemetryEvents := make([]telemetry.NetworkEvent, 0, len(batch))
for _, pEvent := range batch {
tEvent, err := telemetry.NetworkEventFromProto(pEvent)
if err != nil {
// Events that fail to be converted get discarded for now.
continue
}
telemetryEvents = append(telemetryEvents, tEvent)
}
api.Telemetry.Report(&telemetry.Snapshot{
NetworkEvents: telemetryEvents,
})
}
type yamuxPingerCloser struct {
mux *yamux.Session
}
@@ -50,10 +50,13 @@ func TestTailnetAPIConnector_Disconnects(t *testing.T) {
coordPtr.Store(&coord)
derpMapCh := make(chan *tailcfg.DERPMap)
defer close(derpMapCh)
svc, err := tailnet.NewClientService(
logger, &coordPtr,
time.Millisecond, func() *tailcfg.DERPMap { return <-derpMapCh },
)
svc, err := tailnet.NewClientService(tailnet.ClientServiceOptions{
Logger: logger,
CoordPtr: &coordPtr,
DERPMapUpdateFrequency: time.Millisecond,
DERPMapFn: func() *tailcfg.DERPMap { return <-derpMapCh },
NetworkTelemetryHandler: func(batch []*proto.TelemetryEvent) {},
})
require.NoError(t, err)
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+7 -6
View File
@@ -138,12 +138,13 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
}
return api.fetchRegions(ctx)
}
api.tailnetService, err = tailnet.NewClientService(
api.Logger.Named("tailnetclient"),
&api.AGPL.TailnetCoordinator,
api.Options.DERPMapUpdateFrequency,
api.AGPL.DERPMap,
)
api.tailnetService, err = tailnet.NewClientService(agpltailnet.ClientServiceOptions{
Logger: api.Logger.Named("tailnetclient"),
CoordPtr: &api.AGPL.TailnetCoordinator,
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
DERPMapFn: api.AGPL.DERPMap,
NetworkTelemetryHandler: api.AGPL.NetworkTelemetryBatcher.Handler,
})
if err != nil {
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))
}
+8 -11
View File
@@ -6,12 +6,10 @@ import (
"encoding/json"
"errors"
"net"
"sync/atomic"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"tailscale.com/tailcfg"
"cdr.dev/slog"
"github.com/coder/coder/v2/apiversion"
@@ -25,15 +23,14 @@ type ClientService struct {
// NewClientService returns a ClientService based on the given Coordinator pointer. The pointer is
// loaded on each processed connection.
func NewClientService(
logger slog.Logger,
coordPtr *atomic.Pointer[agpl.Coordinator],
derpMapUpdateFrequency time.Duration,
derpMapFn func() *tailcfg.DERPMap,
) (
*ClientService, error,
) {
s, err := agpl.NewClientService(logger, coordPtr, derpMapUpdateFrequency, derpMapFn)
func NewClientService(options agpl.ClientServiceOptions) (*ClientService, error) {
s, err := agpl.NewClientService(agpl.ClientServiceOptions{
Logger: options.Logger,
CoordPtr: options.CoordPtr,
DERPMapUpdateFrequency: options.DERPMapUpdateFrequency,
DERPMapFn: options.DERPMapFn,
NetworkTelemetryHandler: options.NetworkTelemetryHandler,
})
if err != nil {
return nil, err
}
@@ -171,11 +171,13 @@ func TestDialCoordinator(t *testing.T) {
coordPtr := atomic.Pointer[agpl.Coordinator]{}
coordPtr.Store(&coord)
cSrv, err := tailnet.NewClientService(
logger, &coordPtr,
time.Hour,
func() *tailcfg.DERPMap { panic("not implemented") },
)
cSrv, err := tailnet.NewClientService(agpl.ClientServiceOptions{
Logger: logger,
CoordPtr: &coordPtr,
DERPMapUpdateFrequency: time.Hour,
DERPMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
NetworkTelemetryHandler: func(batch []*proto.TelemetryEvent) { panic("not implemented") },
})
require.NoError(t, err)
// buffer the channels here, so we don't need to read and write in goroutines to
+14 -10
View File
@@ -624,11 +624,13 @@ func TestRemoteCoordination(t *testing.T) {
var coord tailnet.Coordinator = mCoord
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
coordPtr.Store(&coord)
svc, err := tailnet.NewClientService(
logger.Named("svc"), &coordPtr,
time.Hour,
func() *tailcfg.DERPMap { panic("not implemented") },
)
svc, err := tailnet.NewClientService(tailnet.ClientServiceOptions{
Logger: logger.Named("svc"),
CoordPtr: &coordPtr,
DERPMapUpdateFrequency: time.Hour,
DERPMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
NetworkTelemetryHandler: func(batch []*proto.TelemetryEvent) { panic("not implemented") },
})
require.NoError(t, err)
sC, cC := net.Pipe()
@@ -673,11 +675,13 @@ func TestRemoteCoordination_SendsReadyForHandshake(t *testing.T) {
var coord tailnet.Coordinator = mCoord
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
coordPtr.Store(&coord)
svc, err := tailnet.NewClientService(
logger.Named("svc"), &coordPtr,
time.Hour,
func() *tailcfg.DERPMap { panic("not implemented") },
)
svc, err := tailnet.NewClientService(tailnet.ClientServiceOptions{
Logger: logger.Named("svc"),
CoordPtr: &coordPtr,
DERPMapUpdateFrequency: time.Hour,
DERPMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
NetworkTelemetryHandler: func(batch []*proto.TelemetryEvent) { panic("not implemented") },
})
require.NoError(t, err)
sC, cC := net.Pipe()
File diff suppressed because it is too large Load Diff
+20 -17
View File
@@ -102,6 +102,18 @@ message CoordinateResponse {
string error = 2;
}
message IPFields {
int32 version = 1;
enum IPClass {
PUBLIC = 0;
PRIVATE = 1;
LINK_LOCAL = 2;
UNIQUE_LOCAL = 3;
LOOPBACK = 4;
}
IPClass class = 2;
}
message Netcheck {
bool UDP = 1;
bool IPv6 = 2;
@@ -123,8 +135,12 @@ message Netcheck {
map<int64, google.protobuf.Duration> RegionV4Latency = 15;
map<int64, google.protobuf.Duration> RegionV6Latency = 16;
string GlobalV4 = 17;
string GlobalV6 = 18;
message NetcheckIP {
string hash = 1;
IPFields fields = 2;
}
NetcheckIP GlobalV4 = 17;
NetcheckIP GlobalV6 = 18;
google.protobuf.BoolValue CaptivePortal = 19;
}
@@ -142,19 +158,6 @@ message TelemetryEvent {
WSPROXY = 3;
}
enum IPClass {
PUBLIC = 0;
PRIVATE = 1;
LINK_LOCAL = 2;
UNIQUE_LOCAL = 3;
LOOPBACK = 4;
}
message IPFields {
int32 version = 1;
IPClass class = 2;
}
message P2PEndpoint {
string hash = 1;
int32 port = 2;
@@ -167,8 +170,8 @@ message TelemetryEvent {
Status status = 4;
string disconnection_reason = 5;
ClientType client_type = 6;
string node_id_self = 7;
string node_id_remote = 8;
uint64 node_id_self = 7;
uint64 node_id_remote = 8;
P2PEndpoint p2p_endpoint = 9;
map<string, IPFields> log_ip_hashes = 10;
string home_derp = 11;
+132 -21
View File
@@ -4,21 +4,21 @@ import (
"context"
"io"
"net"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/hashicorp/yamux"
"storj.io/drpc/drpcerr"
"golang.org/x/xerrors"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
"tailscale.com/tailcfg"
"cdr.dev/slog"
"github.com/coder/coder/v2/apiversion"
"github.com/coder/coder/v2/clock"
"github.com/coder/coder/v2/tailnet/proto"
"golang.org/x/xerrors"
)
type streamIDContextKey struct{}
@@ -37,6 +37,14 @@ func WithStreamID(ctx context.Context, streamID StreamID) context.Context {
return context.WithValue(ctx, streamIDContextKey{}, streamID)
}
type ClientServiceOptions struct {
Logger slog.Logger
CoordPtr *atomic.Pointer[Coordinator]
DERPMapUpdateFrequency time.Duration
DERPMapFn func() *tailcfg.DERPMap
NetworkTelemetryHandler func(batch []*proto.TelemetryEvent)
}
// ClientService is a tailnet coordination service that accepts a connection and version from a
// tailnet client, and support versions 1.0 and 2.x of the Tailnet API protocol.
type ClientService struct {
@@ -47,21 +55,17 @@ type ClientService struct {
// NewClientService returns a ClientService based on the given Coordinator pointer. The pointer is
// loaded on each processed connection.
func NewClientService(
logger slog.Logger,
coordPtr *atomic.Pointer[Coordinator],
derpMapUpdateFrequency time.Duration,
derpMapFn func() *tailcfg.DERPMap,
) (
func NewClientService(options ClientServiceOptions) (
*ClientService, error,
) {
s := &ClientService{Logger: logger, CoordPtr: coordPtr}
s := &ClientService{Logger: options.Logger, CoordPtr: options.CoordPtr}
mux := drpcmux.New()
drpcService := &DRPCService{
CoordPtr: coordPtr,
Logger: logger,
DerpMapUpdateFrequency: derpMapUpdateFrequency,
DerpMapFn: derpMapFn,
CoordPtr: options.CoordPtr,
Logger: options.Logger,
DerpMapUpdateFrequency: options.DERPMapUpdateFrequency,
DerpMapFn: options.DERPMapFn,
NetworkTelemetryHandler: options.NetworkTelemetryHandler,
}
err := proto.DRPCRegisterTailnet(mux, drpcService)
if err != nil {
@@ -74,7 +78,7 @@ func NewClientService(
xerrors.Is(err, context.DeadlineExceeded) {
return
}
logger.Debug(context.Background(), "drpc server error", slog.Error(err))
options.Logger.Debug(context.Background(), "drpc server error", slog.Error(err))
},
})
s.drpc = server
@@ -118,14 +122,18 @@ func (s ClientService) ServeConnV2(ctx context.Context, conn net.Conn, streamID
// DRPCService is the dRPC-based, version 2.x of the tailnet API and implements proto.DRPCClientServer
type DRPCService struct {
CoordPtr *atomic.Pointer[Coordinator]
Logger slog.Logger
DerpMapUpdateFrequency time.Duration
DerpMapFn func() *tailcfg.DERPMap
CoordPtr *atomic.Pointer[Coordinator]
Logger slog.Logger
DerpMapUpdateFrequency time.Duration
DerpMapFn func() *tailcfg.DERPMap
NetworkTelemetryHandler func(batch []*proto.TelemetryEvent)
}
func (*DRPCService) PostTelemetry(context.Context, *proto.TelemetryRequest) (*proto.TelemetryResponse, error) {
return nil, drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented)
func (s *DRPCService) PostTelemetry(_ context.Context, req *proto.TelemetryRequest) (*proto.TelemetryResponse, error) {
if s.NetworkTelemetryHandler != nil {
s.NetworkTelemetryHandler(req.Events)
}
return &proto.TelemetryResponse{}, nil
}
func (s *DRPCService) StreamDERPMaps(_ *proto.StreamDERPMapsRequest, stream proto.DRPCTailnet_StreamDERPMapsStream) error {
@@ -230,3 +238,106 @@ func (c communicator) loopResp() {
}
}
}
type NetworkTelemetryBatcher struct {
clock clock.Clock
frequency time.Duration
maxSize int
batchFn func(batch []*proto.TelemetryEvent)
mu sync.Mutex
closed chan struct{}
done chan struct{}
ticker *clock.Ticker
pending []*proto.TelemetryEvent
}
func NewNetworkTelemetryBatcher(clk clock.Clock, frequency time.Duration, maxSize int, batchFn func(batch []*proto.TelemetryEvent)) *NetworkTelemetryBatcher {
b := &NetworkTelemetryBatcher{
clock: clk,
frequency: frequency,
maxSize: maxSize,
batchFn: batchFn,
closed: make(chan struct{}),
done: make(chan struct{}),
}
b.start()
return b
}
func (b *NetworkTelemetryBatcher) Close() error {
close(b.closed)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
select {
case <-ctx.Done():
return xerrors.New("timed out waiting for batcher to close")
case <-b.done:
}
return nil
}
func (b *NetworkTelemetryBatcher) sendTelemetryBatch() {
b.mu.Lock()
defer b.mu.Unlock()
events := b.pending
if len(events) == 0 {
return
}
b.pending = []*proto.TelemetryEvent{}
b.batchFn(events)
}
func (b *NetworkTelemetryBatcher) start() {
b.ticker = b.clock.NewTicker(b.frequency)
go func() {
defer func() {
// The lock prevents Handler from racing with Close.
b.mu.Lock()
defer b.mu.Unlock()
close(b.done)
b.ticker.Stop()
}()
for {
select {
case <-b.ticker.C:
b.sendTelemetryBatch()
b.ticker.Reset(b.frequency)
case <-b.closed:
// Send any remaining telemetry events before exiting.
b.sendTelemetryBatch()
return
}
}
}()
}
func (b *NetworkTelemetryBatcher) Handler(events []*proto.TelemetryEvent) {
b.mu.Lock()
defer b.mu.Unlock()
select {
case <-b.closed:
return
default:
}
for _, event := range events {
b.pending = append(b.pending, event)
if len(b.pending) >= b.maxSize {
// This can't call sendTelemetryBatch directly because we already
// hold the lock.
events := b.pending
b.pending = []*proto.TelemetryEvent{}
// Resetting the ticker is best effort. We don't care if the ticker
// has already fired or has a pending message, because the only risk
// is that we send two telemetry events in short succession (which
// is totally fine).
b.ticker.Reset(b.frequency)
// Perform the send in a goroutine to avoid blocking the DRPC call.
go b.batchFn(events)
}
}
}
+87 -5
View File
@@ -8,12 +8,14 @@ import (
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"tailscale.com/tailcfg"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/clock"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/coder/v2/tailnet/tailnettest"
@@ -28,10 +30,17 @@ func TestClientService_ServeClient_V2(t *testing.T) {
coordPtr.Store(&coord)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
derpMap := &tailcfg.DERPMap{Regions: map[int]*tailcfg.DERPRegion{999: {RegionCode: "test"}}}
uut, err := tailnet.NewClientService(
logger, &coordPtr,
time.Millisecond, func() *tailcfg.DERPMap { return derpMap },
)
telemetryEvents := make(chan []*proto.TelemetryEvent, 64)
uut, err := tailnet.NewClientService(tailnet.ClientServiceOptions{
Logger: logger,
CoordPtr: &coordPtr,
DERPMapUpdateFrequency: time.Millisecond,
DERPMapFn: func() *tailcfg.DERPMap { return derpMap },
NetworkTelemetryHandler: func(batch []*proto.TelemetryEvent) {
telemetryEvents <- batch
},
})
require.NoError(t, err)
ctx := testutil.Context(t, testutil.WaitShort)
@@ -96,6 +105,25 @@ func TestClientService_ServeClient_V2(t *testing.T) {
err = dms.Close()
require.NoError(t, err)
// PostTelemetry
telemetryReq := &proto.TelemetryRequest{
Events: []*proto.TelemetryEvent{
{
Id: []byte("hi"),
},
{
Id: []byte("bye"),
},
},
}
res, err := client.PostTelemetry(ctx, telemetryReq)
require.NoError(t, err)
require.NotNil(t, res)
gotEvents := testutil.RequireRecvCtx(ctx, t, telemetryEvents)
require.Len(t, gotEvents, 2)
require.Equal(t, "hi", string(gotEvents[0].Id))
require.Equal(t, "bye", string(gotEvents[1].Id))
// RPCs closed; we need to close the Conn to end the session.
err = c.Close()
require.NoError(t, err)
@@ -110,7 +138,13 @@ func TestClientService_ServeClient_V1(t *testing.T) {
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
coordPtr.Store(&coord)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
uut, err := tailnet.NewClientService(logger, &coordPtr, 0, nil)
uut, err := tailnet.NewClientService(tailnet.ClientServiceOptions{
Logger: logger,
CoordPtr: &coordPtr,
DERPMapUpdateFrequency: 0,
DERPMapFn: nil,
NetworkTelemetryHandler: nil,
})
require.NoError(t, err)
ctx := testutil.Context(t, testutil.WaitShort)
@@ -142,3 +176,51 @@ func TestClientService_ServeClient_V1(t *testing.T) {
err = testutil.RequireRecvCtx(ctx, t, errCh)
require.ErrorIs(t, err, expectedError)
}
func TestNetworkTelemetryBatcher(t *testing.T) {
t.Parallel()
var (
events = make(chan []*proto.TelemetryEvent, 64)
mClock = clock.NewMock(t)
b = tailnet.NewNetworkTelemetryBatcher(mClock, time.Millisecond, 3, func(batch []*proto.TelemetryEvent) {
assert.LessOrEqual(t, len(batch), 3)
events <- batch
})
)
b.Handler([]*proto.TelemetryEvent{
{Id: []byte("1")},
{Id: []byte("2")},
})
b.Handler([]*proto.TelemetryEvent{
{Id: []byte("3")},
{Id: []byte("4")},
})
// Should overflow and send a batch.
ctx := testutil.Context(t, testutil.WaitShort)
batch := testutil.RequireRecvCtx(ctx, t, events)
require.Len(t, batch, 3)
require.Equal(t, "1", string(batch[0].Id))
require.Equal(t, "2", string(batch[1].Id))
require.Equal(t, "3", string(batch[2].Id))
// Should send any pending events when the ticker fires.
mClock.Advance(time.Millisecond)
batch = testutil.RequireRecvCtx(ctx, t, events)
require.Len(t, batch, 1)
require.Equal(t, "4", string(batch[0].Id))
// Should send any pending events when closed.
b.Handler([]*proto.TelemetryEvent{
{Id: []byte("5")},
{Id: []byte("6")},
})
err := b.Close()
require.NoError(t, err)
batch = testutil.RequireRecvCtx(ctx, t, events)
require.Len(t, batch, 2)
require.Equal(t, "5", string(batch[0].Id))
require.Equal(t, "6", string(batch[1].Id))
}
+12 -5
View File
@@ -38,6 +38,7 @@ import (
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/tailnet"
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/coder/v2/testutil"
)
@@ -169,11 +170,17 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux {
conns: make(map[uuid.UUID]net.Conn),
}
csvc, err := tailnet.NewClientService(logger, &coordPtr, 10*time.Minute, func() *tailcfg.DERPMap {
return &tailcfg.DERPMap{
// Clients will set their own based on their custom access URL.
Regions: map[int]*tailcfg.DERPRegion{},
}
csvc, err := tailnet.NewClientService(tailnet.ClientServiceOptions{
Logger: logger,
CoordPtr: &coordPtr,
DERPMapUpdateFrequency: 10 * time.Minute,
DERPMapFn: func() *tailcfg.DERPMap {
return &tailcfg.DERPMap{
// Clients will set their own based on their custom access URL.
Regions: map[int]*tailcfg.DERPRegion{},
}
},
NetworkTelemetryHandler: func(batch []*tailnetproto.TelemetryEvent) {},
})
require.NoError(t, err)