package agentapi import ( "context" "io" "net" "net/url" "sync" "sync/atomic" "time" "github.com/google/uuid" "golang.org/x/xerrors" "storj.io/drpc/drpcmux" "storj.io/drpc/drpcserver" "tailscale.com/tailcfg" "cdr.dev/slog/v3" agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/coderd/agentapi/metadatabatcher" "github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor" "github.com/coder/coder/v2/coderd/appearance" "github.com/coder/coder/v2/coderd/boundaryusage" "github.com/coder/coder/v2/coderd/connectionlog" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/externalauth" "github.com/coder/coder/v2/coderd/notifications" "github.com/coder/coder/v2/coderd/prometheusmetrics" "github.com/coder/coder/v2/coderd/tracing" "github.com/coder/coder/v2/coderd/workspacestats" "github.com/coder/coder/v2/coderd/wspubsub" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/drpcsdk" "github.com/coder/coder/v2/tailnet" tailnetproto "github.com/coder/coder/v2/tailnet/proto" "github.com/coder/quartz" ) const workspaceCacheRefreshInterval = 5 * time.Minute // API implements the DRPC agent API interface from agent/proto. This struct is // instantiated once per agent connection and kept alive for the duration of the // session. type API struct { opts Options *ManifestAPI *AnnouncementBannerAPI *StatsAPI *LifecycleAPI *AppsAPI *MetadataAPI *ResourcesMonitoringAPI *LogsAPI *ScriptsAPI *ConnLogAPI *SubAgentAPI *BoundaryLogsAPI *tailnet.DRPCService cachedWorkspaceFields *CachedWorkspaceFields mu sync.Mutex } var _ agentproto.DRPCAgentServer = &API{} type Options struct { AgentID uuid.UUID OwnerID uuid.UUID WorkspaceID uuid.UUID OrganizationID uuid.UUID TemplateVersionID uuid.UUID AuthenticatedCtx context.Context Log slog.Logger Clock quartz.Clock Database database.Store NotificationsEnqueuer notifications.Enqueuer Pubsub pubsub.Pubsub ConnectionLogger *atomic.Pointer[connectionlog.ConnectionLogger] DerpMapFn func() *tailcfg.DERPMap TailnetCoordinator *atomic.Pointer[tailnet.Coordinator] StatsReporter *workspacestats.Reporter MetadataBatcher *metadatabatcher.Batcher AppearanceFetcher *atomic.Pointer[appearance.Fetcher] PublishWorkspaceUpdateFn func(ctx context.Context, userID uuid.UUID, event wspubsub.WorkspaceEvent) PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage) NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent) BoundaryUsageTracker *boundaryusage.Tracker LifecycleMetrics *LifecycleMetrics AccessURL *url.URL AppHostname string AgentStatsRefreshInterval time.Duration DisableDirectConnections bool DerpForceWebSockets bool DerpMapUpdateFrequency time.Duration ExternalAuthConfigs []*externalauth.Config Experiments codersdk.Experiments UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) } func New(opts Options, workspace database.Workspace, agent database.WorkspaceAgent) *API { if opts.Clock == nil { opts.Clock = quartz.NewReal() } api := &API{ opts: opts, mu: sync.Mutex{}, } api.ManifestAPI = &ManifestAPI{ AccessURL: opts.AccessURL, AppHostname: opts.AppHostname, ExternalAuthConfigs: opts.ExternalAuthConfigs, DisableDirectConnections: opts.DisableDirectConnections, DerpForceWebSockets: opts.DerpForceWebSockets, AgentFn: api.agent, Database: opts.Database, DerpMapFn: opts.DerpMapFn, WorkspaceID: opts.WorkspaceID, } // Don't cache details for prebuilds, though the cached fields will eventually be updated // by the refresh routine once the prebuild workspace is claimed. api.cachedWorkspaceFields = &CachedWorkspaceFields{} if !workspace.IsPrebuild() { api.cachedWorkspaceFields.UpdateValues(workspace) } api.AnnouncementBannerAPI = &AnnouncementBannerAPI{ appearanceFetcher: opts.AppearanceFetcher, } api.ResourcesMonitoringAPI = &ResourcesMonitoringAPI{ AgentID: opts.AgentID, WorkspaceID: opts.WorkspaceID, Clock: opts.Clock, Database: opts.Database, NotificationsEnqueuer: opts.NotificationsEnqueuer, Debounce: 30 * time.Minute, Config: resourcesmonitor.Config{ NumDatapoints: 20, CollectionInterval: 10 * time.Second, Alert: resourcesmonitor.AlertConfig{ MinimumNOKsPercent: 20, ConsecutiveNOKsPercent: 50, }, }, } api.StatsAPI = &StatsAPI{ AgentID: agent.ID, AgentName: agent.Name, Workspace: api.cachedWorkspaceFields, Database: opts.Database, Log: opts.Log, StatsReporter: opts.StatsReporter, AgentStatsRefreshInterval: opts.AgentStatsRefreshInterval, Experiments: opts.Experiments, } api.LifecycleAPI = &LifecycleAPI{ AgentFn: api.agent, WorkspaceID: opts.WorkspaceID, Database: opts.Database, Log: opts.Log, PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate, Metrics: opts.LifecycleMetrics, } api.AppsAPI = &AppsAPI{ AgentID: agent.ID, AgentFn: api.agent, Database: opts.Database, Log: opts.Log, Workspace: api.cachedWorkspaceFields, PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate, Clock: opts.Clock, NotificationsEnqueuer: opts.NotificationsEnqueuer, } api.MetadataAPI = &MetadataAPI{ AgentID: agent.ID, Workspace: api.cachedWorkspaceFields, Database: opts.Database, Log: opts.Log, Batcher: opts.MetadataBatcher, } api.LogsAPI = &LogsAPI{ AgentFn: api.agent, Database: opts.Database, Log: opts.Log, PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate, PublishWorkspaceAgentLogsUpdateFn: opts.PublishWorkspaceAgentLogsUpdateFn, } api.ScriptsAPI = &ScriptsAPI{ Database: opts.Database, } api.ConnLogAPI = &ConnLogAPI{ AgentID: agent.ID, AgentName: agent.Name, ConnectionLogger: opts.ConnectionLogger, Database: opts.Database, Workspace: api.cachedWorkspaceFields, Log: opts.Log, } api.DRPCService = &tailnet.DRPCService{ CoordPtr: opts.TailnetCoordinator, Logger: opts.Log, DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency, DerpMapFn: opts.DerpMapFn, NetworkTelemetryHandler: opts.NetworkTelemetryHandler, } api.SubAgentAPI = &SubAgentAPI{ OwnerID: opts.OwnerID, OrganizationID: opts.OrganizationID, AgentFn: api.agent, Log: opts.Log, Clock: opts.Clock, Database: opts.Database, } api.BoundaryLogsAPI = &BoundaryLogsAPI{ Log: opts.Log, WorkspaceID: opts.WorkspaceID, OwnerID: opts.OwnerID, TemplateID: workspace.TemplateID, TemplateVersionID: opts.TemplateVersionID, BoundaryUsageTracker: opts.BoundaryUsageTracker, } // Start background cache refresh loop to handle workspace changes // like prebuild claims where owner_id and other fields may be modified in the DB. go api.startCacheRefreshLoop(opts.AuthenticatedCtx) return api } func (a *API) Server(ctx context.Context) (*drpcserver.Server, error) { mux := drpcmux.New() err := agentproto.DRPCRegisterAgent(mux, a) if err != nil { return nil, xerrors.Errorf("register agent API protocol in DRPC mux: %w", err) } err = tailnetproto.DRPCRegisterTailnet(mux, a) if err != nil { return nil, xerrors.Errorf("register tailnet API protocol in DRPC mux: %w", err) } return drpcserver.NewWithOptions(&tracing.DRPCHandler{Handler: mux}, drpcserver.Options{ Manager: drpcsdk.DefaultDRPCOptions(nil), Log: func(err error) { if xerrors.Is(err, io.EOF) { return } a.opts.Log.Debug(ctx, "drpc server error", slog.Error(err)) }, }, ), nil } func (a *API) Serve(ctx context.Context, l net.Listener) error { server, err := a.Server(ctx) if err != nil { return xerrors.Errorf("create agent API server: %w", err) } if err := a.ResourcesMonitoringAPI.InitMonitors(ctx); err != nil { return xerrors.Errorf("initialize resource monitoring: %w", err) } return server.Serve(ctx, l) } func (a *API) agent(ctx context.Context) (database.WorkspaceAgent, error) { agent, err := a.opts.Database.GetWorkspaceAgentByID(ctx, a.opts.AgentID) if err != nil { return database.WorkspaceAgent{}, xerrors.Errorf("get workspace agent by id %q: %w", a.opts.AgentID, err) } return agent, nil } // refreshCachedWorkspace periodically updates the cached workspace fields. // This ensures that changes like prebuild claims (which modify owner_id, name, etc.) // are eventually reflected in the cache without requiring agent reconnection. func (a *API) refreshCachedWorkspace(ctx context.Context) { ws, err := a.opts.Database.GetWorkspaceByID(ctx, a.opts.WorkspaceID) if err != nil { // Do not clear the cache on transient DB errors. Stale data is // preferable to no data, which forces callers to fall back to // expensive queries like GetWorkspaceByAgentID. a.opts.Log.Warn(ctx, "failed to refresh cached workspace fields", slog.Error(err)) return } if ws.IsPrebuild() { return } // If we still have the same values, skip the update and logging calls. if a.cachedWorkspaceFields.identity.Equal(database.WorkspaceIdentityFromWorkspace(ws)) { return } // Update fields that can change during workspace lifecycle (e.g., AutostartSchedule) a.cachedWorkspaceFields.UpdateValues(ws) a.opts.Log.Debug(ctx, "refreshed cached workspace fields", slog.F("workspace_id", ws.ID), slog.F("owner_id", ws.OwnerID), slog.F("name", ws.Name)) } // startCacheRefreshLoop runs a background goroutine that periodically refreshes // the cached workspace fields. This is primarily needed to handle prebuild claims // where the owner_id and other fields change while the agent connection persists. func (a *API) startCacheRefreshLoop(ctx context.Context) { // Refresh every 5 minutes. This provides a reasonable balance between: // - Keeping cache fresh for prebuild claims and other workspace updates // - Minimizing unnecessary database queries ticker := a.opts.Clock.TickerFunc(ctx, workspaceCacheRefreshInterval, func() error { a.refreshCachedWorkspace(ctx) return nil }, "cache_refresh") // We need to wait on the ticker exiting. _ = ticker.Wait() a.opts.Log.Debug(ctx, "cache refresh loop exited, invalidating the workspace cache on agent API", slog.F("workspace_id", a.cachedWorkspaceFields.identity.ID), slog.F("owner_id", a.cachedWorkspaceFields.identity.OwnerUsername), slog.F("name", a.cachedWorkspaceFields.identity.Name)) a.cachedWorkspaceFields.Clear() } func (a *API) publishWorkspaceUpdate(ctx context.Context, agentID uuid.UUID, kind wspubsub.WorkspaceEventKind) error { a.opts.PublishWorkspaceUpdateFn(ctx, a.opts.OwnerID, wspubsub.WorkspaceEvent{ Kind: kind, WorkspaceID: a.opts.WorkspaceID, AgentID: &agentID, }) return nil }