mirror of
https://github.com/coder/coder.git
synced 2026-06-03 13:08:25 +00:00
2204731ddb
Implements telemetry for boundary usage tracking across all Coder replicas and reports them via telemetry. Changes: - Implement Tracker with Track(), FlushToDB(), and StartFlushLoop() methods - Add telemetry integration via collectBoundaryUsageSummary() - Use telemetry lock to ensure only one replica collects per period The tracker accumulates unique workspaces, unique users, and request counts (allowed/denied) in memory, then flushes to the database periodically. During telemetry collection, stats are aggregated across all replicas and reset for the next period.
348 lines
11 KiB
Go
348 lines
11 KiB
Go
package agentapi
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net"
|
|
"net/url"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"golang.org/x/xerrors"
|
|
"storj.io/drpc/drpcmux"
|
|
"storj.io/drpc/drpcserver"
|
|
"tailscale.com/tailcfg"
|
|
|
|
"cdr.dev/slog/v3"
|
|
agentproto "github.com/coder/coder/v2/agent/proto"
|
|
"github.com/coder/coder/v2/coderd/agentapi/metadatabatcher"
|
|
"github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor"
|
|
"github.com/coder/coder/v2/coderd/appearance"
|
|
"github.com/coder/coder/v2/coderd/boundaryusage"
|
|
"github.com/coder/coder/v2/coderd/connectionlog"
|
|
"github.com/coder/coder/v2/coderd/database"
|
|
"github.com/coder/coder/v2/coderd/database/pubsub"
|
|
"github.com/coder/coder/v2/coderd/externalauth"
|
|
"github.com/coder/coder/v2/coderd/notifications"
|
|
"github.com/coder/coder/v2/coderd/prometheusmetrics"
|
|
"github.com/coder/coder/v2/coderd/tracing"
|
|
"github.com/coder/coder/v2/coderd/workspacestats"
|
|
"github.com/coder/coder/v2/coderd/wspubsub"
|
|
"github.com/coder/coder/v2/codersdk"
|
|
"github.com/coder/coder/v2/codersdk/agentsdk"
|
|
"github.com/coder/coder/v2/codersdk/drpcsdk"
|
|
"github.com/coder/coder/v2/tailnet"
|
|
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
|
|
"github.com/coder/quartz"
|
|
)
|
|
|
|
const workspaceCacheRefreshInterval = 5 * time.Minute
|
|
|
|
// API implements the DRPC agent API interface from agent/proto. This struct is
|
|
// instantiated once per agent connection and kept alive for the duration of the
|
|
// session.
|
|
type API struct {
|
|
opts Options
|
|
*ManifestAPI
|
|
*AnnouncementBannerAPI
|
|
*StatsAPI
|
|
*LifecycleAPI
|
|
*AppsAPI
|
|
*MetadataAPI
|
|
*ResourcesMonitoringAPI
|
|
*LogsAPI
|
|
*ScriptsAPI
|
|
*ConnLogAPI
|
|
*SubAgentAPI
|
|
*BoundaryLogsAPI
|
|
*tailnet.DRPCService
|
|
|
|
cachedWorkspaceFields *CachedWorkspaceFields
|
|
|
|
mu sync.Mutex
|
|
}
|
|
|
|
var _ agentproto.DRPCAgentServer = &API{}
|
|
|
|
type Options struct {
|
|
AgentID uuid.UUID
|
|
OwnerID uuid.UUID
|
|
WorkspaceID uuid.UUID
|
|
OrganizationID uuid.UUID
|
|
TemplateVersionID uuid.UUID
|
|
|
|
AuthenticatedCtx context.Context
|
|
Log slog.Logger
|
|
Clock quartz.Clock
|
|
Database database.Store
|
|
NotificationsEnqueuer notifications.Enqueuer
|
|
Pubsub pubsub.Pubsub
|
|
ConnectionLogger *atomic.Pointer[connectionlog.ConnectionLogger]
|
|
DerpMapFn func() *tailcfg.DERPMap
|
|
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
|
|
StatsReporter *workspacestats.Reporter
|
|
MetadataBatcher *metadatabatcher.Batcher
|
|
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
|
|
PublishWorkspaceUpdateFn func(ctx context.Context, userID uuid.UUID, event wspubsub.WorkspaceEvent)
|
|
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
|
|
NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent)
|
|
BoundaryUsageTracker *boundaryusage.Tracker
|
|
|
|
AccessURL *url.URL
|
|
AppHostname string
|
|
AgentStatsRefreshInterval time.Duration
|
|
DisableDirectConnections bool
|
|
DerpForceWebSockets bool
|
|
DerpMapUpdateFrequency time.Duration
|
|
ExternalAuthConfigs []*externalauth.Config
|
|
Experiments codersdk.Experiments
|
|
|
|
UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric)
|
|
}
|
|
|
|
func New(opts Options, workspace database.Workspace) *API {
|
|
if opts.Clock == nil {
|
|
opts.Clock = quartz.NewReal()
|
|
}
|
|
|
|
api := &API{
|
|
opts: opts,
|
|
mu: sync.Mutex{},
|
|
}
|
|
|
|
api.ManifestAPI = &ManifestAPI{
|
|
AccessURL: opts.AccessURL,
|
|
AppHostname: opts.AppHostname,
|
|
ExternalAuthConfigs: opts.ExternalAuthConfigs,
|
|
DisableDirectConnections: opts.DisableDirectConnections,
|
|
DerpForceWebSockets: opts.DerpForceWebSockets,
|
|
AgentFn: api.agent,
|
|
Database: opts.Database,
|
|
DerpMapFn: opts.DerpMapFn,
|
|
WorkspaceID: opts.WorkspaceID,
|
|
}
|
|
|
|
// Don't cache details for prebuilds, though the cached fields will eventually be updated
|
|
// by the refresh routine once the prebuild workspace is claimed.
|
|
api.cachedWorkspaceFields = &CachedWorkspaceFields{}
|
|
if !workspace.IsPrebuild() {
|
|
api.cachedWorkspaceFields.UpdateValues(workspace)
|
|
}
|
|
|
|
api.AnnouncementBannerAPI = &AnnouncementBannerAPI{
|
|
appearanceFetcher: opts.AppearanceFetcher,
|
|
}
|
|
|
|
api.ResourcesMonitoringAPI = &ResourcesMonitoringAPI{
|
|
AgentID: opts.AgentID,
|
|
WorkspaceID: opts.WorkspaceID,
|
|
Clock: opts.Clock,
|
|
Database: opts.Database,
|
|
NotificationsEnqueuer: opts.NotificationsEnqueuer,
|
|
Debounce: 30 * time.Minute,
|
|
|
|
Config: resourcesmonitor.Config{
|
|
NumDatapoints: 20,
|
|
CollectionInterval: 10 * time.Second,
|
|
|
|
Alert: resourcesmonitor.AlertConfig{
|
|
MinimumNOKsPercent: 20,
|
|
ConsecutiveNOKsPercent: 50,
|
|
},
|
|
},
|
|
}
|
|
|
|
api.StatsAPI = &StatsAPI{
|
|
AgentFn: api.agent,
|
|
Workspace: api.cachedWorkspaceFields,
|
|
Database: opts.Database,
|
|
Log: opts.Log,
|
|
StatsReporter: opts.StatsReporter,
|
|
AgentStatsRefreshInterval: opts.AgentStatsRefreshInterval,
|
|
Experiments: opts.Experiments,
|
|
}
|
|
|
|
api.LifecycleAPI = &LifecycleAPI{
|
|
AgentFn: api.agent,
|
|
WorkspaceID: opts.WorkspaceID,
|
|
Database: opts.Database,
|
|
Log: opts.Log,
|
|
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
|
|
}
|
|
|
|
api.AppsAPI = &AppsAPI{
|
|
AgentFn: api.agent,
|
|
Database: opts.Database,
|
|
Log: opts.Log,
|
|
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
|
|
}
|
|
|
|
api.MetadataAPI = &MetadataAPI{
|
|
AgentFn: api.agent,
|
|
Workspace: api.cachedWorkspaceFields,
|
|
Database: opts.Database,
|
|
Log: opts.Log,
|
|
Batcher: opts.MetadataBatcher,
|
|
}
|
|
|
|
api.LogsAPI = &LogsAPI{
|
|
AgentFn: api.agent,
|
|
Database: opts.Database,
|
|
Log: opts.Log,
|
|
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
|
|
PublishWorkspaceAgentLogsUpdateFn: opts.PublishWorkspaceAgentLogsUpdateFn,
|
|
}
|
|
|
|
api.ScriptsAPI = &ScriptsAPI{
|
|
Database: opts.Database,
|
|
}
|
|
|
|
api.ConnLogAPI = &ConnLogAPI{
|
|
AgentFn: api.agent,
|
|
ConnectionLogger: opts.ConnectionLogger,
|
|
Database: opts.Database,
|
|
Workspace: api.cachedWorkspaceFields,
|
|
Log: opts.Log,
|
|
}
|
|
|
|
api.DRPCService = &tailnet.DRPCService{
|
|
CoordPtr: opts.TailnetCoordinator,
|
|
Logger: opts.Log,
|
|
DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency,
|
|
DerpMapFn: opts.DerpMapFn,
|
|
NetworkTelemetryHandler: opts.NetworkTelemetryHandler,
|
|
}
|
|
|
|
api.SubAgentAPI = &SubAgentAPI{
|
|
OwnerID: opts.OwnerID,
|
|
OrganizationID: opts.OrganizationID,
|
|
AgentID: opts.AgentID,
|
|
AgentFn: api.agent,
|
|
Log: opts.Log,
|
|
Clock: opts.Clock,
|
|
Database: opts.Database,
|
|
}
|
|
|
|
api.BoundaryLogsAPI = &BoundaryLogsAPI{
|
|
Log: opts.Log,
|
|
WorkspaceID: opts.WorkspaceID,
|
|
OwnerID: opts.OwnerID,
|
|
TemplateID: workspace.TemplateID,
|
|
TemplateVersionID: opts.TemplateVersionID,
|
|
BoundaryUsageTracker: opts.BoundaryUsageTracker,
|
|
}
|
|
|
|
// Start background cache refresh loop to handle workspace changes
|
|
// like prebuild claims where owner_id and other fields may be modified in the DB.
|
|
go api.startCacheRefreshLoop(opts.AuthenticatedCtx)
|
|
|
|
return api
|
|
}
|
|
|
|
func (a *API) Server(ctx context.Context) (*drpcserver.Server, error) {
|
|
mux := drpcmux.New()
|
|
err := agentproto.DRPCRegisterAgent(mux, a)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("register agent API protocol in DRPC mux: %w", err)
|
|
}
|
|
|
|
err = tailnetproto.DRPCRegisterTailnet(mux, a)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("register tailnet API protocol in DRPC mux: %w", err)
|
|
}
|
|
|
|
return drpcserver.NewWithOptions(&tracing.DRPCHandler{Handler: mux},
|
|
drpcserver.Options{
|
|
Manager: drpcsdk.DefaultDRPCOptions(nil),
|
|
Log: func(err error) {
|
|
if xerrors.Is(err, io.EOF) {
|
|
return
|
|
}
|
|
a.opts.Log.Debug(ctx, "drpc server error", slog.Error(err))
|
|
},
|
|
},
|
|
), nil
|
|
}
|
|
|
|
func (a *API) Serve(ctx context.Context, l net.Listener) error {
|
|
server, err := a.Server(ctx)
|
|
if err != nil {
|
|
return xerrors.Errorf("create agent API server: %w", err)
|
|
}
|
|
|
|
if err := a.ResourcesMonitoringAPI.InitMonitors(ctx); err != nil {
|
|
return xerrors.Errorf("initialize resource monitoring: %w", err)
|
|
}
|
|
|
|
return server.Serve(ctx, l)
|
|
}
|
|
|
|
func (a *API) agent(ctx context.Context) (database.WorkspaceAgent, error) {
|
|
agent, err := a.opts.Database.GetWorkspaceAgentByID(ctx, a.opts.AgentID)
|
|
if err != nil {
|
|
return database.WorkspaceAgent{}, xerrors.Errorf("get workspace agent by id %q: %w", a.opts.AgentID, err)
|
|
}
|
|
return agent, nil
|
|
}
|
|
|
|
// refreshCachedWorkspace periodically updates the cached workspace fields.
|
|
// This ensures that changes like prebuild claims (which modify owner_id, name, etc.)
|
|
// are eventually reflected in the cache without requiring agent reconnection.
|
|
func (a *API) refreshCachedWorkspace(ctx context.Context) {
|
|
ws, err := a.opts.Database.GetWorkspaceByID(ctx, a.opts.WorkspaceID)
|
|
if err != nil {
|
|
a.opts.Log.Warn(ctx, "failed to refresh cached workspace fields", slog.Error(err))
|
|
a.cachedWorkspaceFields.Clear()
|
|
return
|
|
}
|
|
|
|
if ws.IsPrebuild() {
|
|
return
|
|
}
|
|
|
|
// If we still have the same values, skip the update and logging calls.
|
|
if a.cachedWorkspaceFields.identity.Equal(database.WorkspaceIdentityFromWorkspace(ws)) {
|
|
return
|
|
}
|
|
// Update fields that can change during workspace lifecycle (e.g., AutostartSchedule)
|
|
a.cachedWorkspaceFields.UpdateValues(ws)
|
|
|
|
a.opts.Log.Debug(ctx, "refreshed cached workspace fields",
|
|
slog.F("workspace_id", ws.ID),
|
|
slog.F("owner_id", ws.OwnerID),
|
|
slog.F("name", ws.Name))
|
|
}
|
|
|
|
// startCacheRefreshLoop runs a background goroutine that periodically refreshes
|
|
// the cached workspace fields. This is primarily needed to handle prebuild claims
|
|
// where the owner_id and other fields change while the agent connection persists.
|
|
func (a *API) startCacheRefreshLoop(ctx context.Context) {
|
|
// Refresh every 5 minutes. This provides a reasonable balance between:
|
|
// - Keeping cache fresh for prebuild claims and other workspace updates
|
|
// - Minimizing unnecessary database queries
|
|
ticker := a.opts.Clock.TickerFunc(ctx, workspaceCacheRefreshInterval, func() error {
|
|
a.refreshCachedWorkspace(ctx)
|
|
return nil
|
|
}, "cache_refresh")
|
|
|
|
// We need to wait on the ticker exiting.
|
|
_ = ticker.Wait()
|
|
|
|
a.opts.Log.Debug(ctx, "cache refresh loop exited, invalidating the workspace cache on agent API",
|
|
slog.F("workspace_id", a.cachedWorkspaceFields.identity.ID),
|
|
slog.F("owner_id", a.cachedWorkspaceFields.identity.OwnerUsername),
|
|
slog.F("name", a.cachedWorkspaceFields.identity.Name))
|
|
a.cachedWorkspaceFields.Clear()
|
|
}
|
|
|
|
func (a *API) publishWorkspaceUpdate(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
|
|
a.opts.PublishWorkspaceUpdateFn(ctx, a.opts.OwnerID, wspubsub.WorkspaceEvent{
|
|
Kind: kind,
|
|
WorkspaceID: a.opts.WorkspaceID,
|
|
AgentID: &agent.ID,
|
|
})
|
|
return nil
|
|
}
|