mirror of
https://github.com/coder/coder.git
synced 2026-06-03 13:08:25 +00:00
6035e45cb8
Adds coderd_template_workspace_build_duration_seconds histogram that tracks the full duration from workspace build creation to agent ready. This captures the complete user-perceived build time including provisioning and agent startup. The metric is emitted when the agent reports ready/error/timeout via the lifecycle API, ensuring each build is counted exactly once per replica.
350 lines
11 KiB
Go
350 lines
11 KiB
Go
package agentapi
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net"
|
|
"net/url"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"golang.org/x/xerrors"
|
|
"storj.io/drpc/drpcmux"
|
|
"storj.io/drpc/drpcserver"
|
|
"tailscale.com/tailcfg"
|
|
|
|
"cdr.dev/slog/v3"
|
|
agentproto "github.com/coder/coder/v2/agent/proto"
|
|
"github.com/coder/coder/v2/coderd/agentapi/metadatabatcher"
|
|
"github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor"
|
|
"github.com/coder/coder/v2/coderd/appearance"
|
|
"github.com/coder/coder/v2/coderd/boundaryusage"
|
|
"github.com/coder/coder/v2/coderd/connectionlog"
|
|
"github.com/coder/coder/v2/coderd/database"
|
|
"github.com/coder/coder/v2/coderd/database/pubsub"
|
|
"github.com/coder/coder/v2/coderd/externalauth"
|
|
"github.com/coder/coder/v2/coderd/notifications"
|
|
"github.com/coder/coder/v2/coderd/prometheusmetrics"
|
|
"github.com/coder/coder/v2/coderd/tracing"
|
|
"github.com/coder/coder/v2/coderd/workspacestats"
|
|
"github.com/coder/coder/v2/coderd/wspubsub"
|
|
"github.com/coder/coder/v2/codersdk"
|
|
"github.com/coder/coder/v2/codersdk/agentsdk"
|
|
"github.com/coder/coder/v2/codersdk/drpcsdk"
|
|
"github.com/coder/coder/v2/tailnet"
|
|
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
|
|
"github.com/coder/quartz"
|
|
)
|
|
|
|
const workspaceCacheRefreshInterval = 5 * time.Minute
|
|
|
|
// API implements the DRPC agent API interface from agent/proto. This struct is
|
|
// instantiated once per agent connection and kept alive for the duration of the
|
|
// session.
|
|
type API struct {
|
|
opts Options
|
|
*ManifestAPI
|
|
*AnnouncementBannerAPI
|
|
*StatsAPI
|
|
*LifecycleAPI
|
|
*AppsAPI
|
|
*MetadataAPI
|
|
*ResourcesMonitoringAPI
|
|
*LogsAPI
|
|
*ScriptsAPI
|
|
*ConnLogAPI
|
|
*SubAgentAPI
|
|
*BoundaryLogsAPI
|
|
*tailnet.DRPCService
|
|
|
|
cachedWorkspaceFields *CachedWorkspaceFields
|
|
|
|
mu sync.Mutex
|
|
}
|
|
|
|
var _ agentproto.DRPCAgentServer = &API{}
|
|
|
|
type Options struct {
|
|
AgentID uuid.UUID
|
|
OwnerID uuid.UUID
|
|
WorkspaceID uuid.UUID
|
|
OrganizationID uuid.UUID
|
|
TemplateVersionID uuid.UUID
|
|
|
|
AuthenticatedCtx context.Context
|
|
Log slog.Logger
|
|
Clock quartz.Clock
|
|
Database database.Store
|
|
NotificationsEnqueuer notifications.Enqueuer
|
|
Pubsub pubsub.Pubsub
|
|
ConnectionLogger *atomic.Pointer[connectionlog.ConnectionLogger]
|
|
DerpMapFn func() *tailcfg.DERPMap
|
|
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
|
|
StatsReporter *workspacestats.Reporter
|
|
MetadataBatcher *metadatabatcher.Batcher
|
|
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
|
|
PublishWorkspaceUpdateFn func(ctx context.Context, userID uuid.UUID, event wspubsub.WorkspaceEvent)
|
|
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
|
|
NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent)
|
|
BoundaryUsageTracker *boundaryusage.Tracker
|
|
LifecycleMetrics *LifecycleMetrics
|
|
|
|
AccessURL *url.URL
|
|
AppHostname string
|
|
AgentStatsRefreshInterval time.Duration
|
|
DisableDirectConnections bool
|
|
DerpForceWebSockets bool
|
|
DerpMapUpdateFrequency time.Duration
|
|
ExternalAuthConfigs []*externalauth.Config
|
|
Experiments codersdk.Experiments
|
|
|
|
UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric)
|
|
}
|
|
|
|
func New(opts Options, workspace database.Workspace) *API {
|
|
if opts.Clock == nil {
|
|
opts.Clock = quartz.NewReal()
|
|
}
|
|
|
|
api := &API{
|
|
opts: opts,
|
|
mu: sync.Mutex{},
|
|
}
|
|
|
|
api.ManifestAPI = &ManifestAPI{
|
|
AccessURL: opts.AccessURL,
|
|
AppHostname: opts.AppHostname,
|
|
ExternalAuthConfigs: opts.ExternalAuthConfigs,
|
|
DisableDirectConnections: opts.DisableDirectConnections,
|
|
DerpForceWebSockets: opts.DerpForceWebSockets,
|
|
AgentFn: api.agent,
|
|
Database: opts.Database,
|
|
DerpMapFn: opts.DerpMapFn,
|
|
WorkspaceID: opts.WorkspaceID,
|
|
}
|
|
|
|
// Don't cache details for prebuilds, though the cached fields will eventually be updated
|
|
// by the refresh routine once the prebuild workspace is claimed.
|
|
api.cachedWorkspaceFields = &CachedWorkspaceFields{}
|
|
if !workspace.IsPrebuild() {
|
|
api.cachedWorkspaceFields.UpdateValues(workspace)
|
|
}
|
|
|
|
api.AnnouncementBannerAPI = &AnnouncementBannerAPI{
|
|
appearanceFetcher: opts.AppearanceFetcher,
|
|
}
|
|
|
|
api.ResourcesMonitoringAPI = &ResourcesMonitoringAPI{
|
|
AgentID: opts.AgentID,
|
|
WorkspaceID: opts.WorkspaceID,
|
|
Clock: opts.Clock,
|
|
Database: opts.Database,
|
|
NotificationsEnqueuer: opts.NotificationsEnqueuer,
|
|
Debounce: 30 * time.Minute,
|
|
|
|
Config: resourcesmonitor.Config{
|
|
NumDatapoints: 20,
|
|
CollectionInterval: 10 * time.Second,
|
|
|
|
Alert: resourcesmonitor.AlertConfig{
|
|
MinimumNOKsPercent: 20,
|
|
ConsecutiveNOKsPercent: 50,
|
|
},
|
|
},
|
|
}
|
|
|
|
api.StatsAPI = &StatsAPI{
|
|
AgentFn: api.agent,
|
|
Workspace: api.cachedWorkspaceFields,
|
|
Database: opts.Database,
|
|
Log: opts.Log,
|
|
StatsReporter: opts.StatsReporter,
|
|
AgentStatsRefreshInterval: opts.AgentStatsRefreshInterval,
|
|
Experiments: opts.Experiments,
|
|
}
|
|
|
|
api.LifecycleAPI = &LifecycleAPI{
|
|
AgentFn: api.agent,
|
|
WorkspaceID: opts.WorkspaceID,
|
|
Database: opts.Database,
|
|
Log: opts.Log,
|
|
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
|
|
Metrics: opts.LifecycleMetrics,
|
|
}
|
|
|
|
api.AppsAPI = &AppsAPI{
|
|
AgentFn: api.agent,
|
|
Database: opts.Database,
|
|
Log: opts.Log,
|
|
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
|
|
}
|
|
|
|
api.MetadataAPI = &MetadataAPI{
|
|
AgentFn: api.agent,
|
|
Workspace: api.cachedWorkspaceFields,
|
|
Database: opts.Database,
|
|
Log: opts.Log,
|
|
Batcher: opts.MetadataBatcher,
|
|
}
|
|
|
|
api.LogsAPI = &LogsAPI{
|
|
AgentFn: api.agent,
|
|
Database: opts.Database,
|
|
Log: opts.Log,
|
|
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
|
|
PublishWorkspaceAgentLogsUpdateFn: opts.PublishWorkspaceAgentLogsUpdateFn,
|
|
}
|
|
|
|
api.ScriptsAPI = &ScriptsAPI{
|
|
Database: opts.Database,
|
|
}
|
|
|
|
api.ConnLogAPI = &ConnLogAPI{
|
|
AgentFn: api.agent,
|
|
ConnectionLogger: opts.ConnectionLogger,
|
|
Database: opts.Database,
|
|
Workspace: api.cachedWorkspaceFields,
|
|
Log: opts.Log,
|
|
}
|
|
|
|
api.DRPCService = &tailnet.DRPCService{
|
|
CoordPtr: opts.TailnetCoordinator,
|
|
Logger: opts.Log,
|
|
DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency,
|
|
DerpMapFn: opts.DerpMapFn,
|
|
NetworkTelemetryHandler: opts.NetworkTelemetryHandler,
|
|
}
|
|
|
|
api.SubAgentAPI = &SubAgentAPI{
|
|
OwnerID: opts.OwnerID,
|
|
OrganizationID: opts.OrganizationID,
|
|
AgentID: opts.AgentID,
|
|
AgentFn: api.agent,
|
|
Log: opts.Log,
|
|
Clock: opts.Clock,
|
|
Database: opts.Database,
|
|
}
|
|
|
|
api.BoundaryLogsAPI = &BoundaryLogsAPI{
|
|
Log: opts.Log,
|
|
WorkspaceID: opts.WorkspaceID,
|
|
OwnerID: opts.OwnerID,
|
|
TemplateID: workspace.TemplateID,
|
|
TemplateVersionID: opts.TemplateVersionID,
|
|
BoundaryUsageTracker: opts.BoundaryUsageTracker,
|
|
}
|
|
|
|
// Start background cache refresh loop to handle workspace changes
|
|
// like prebuild claims where owner_id and other fields may be modified in the DB.
|
|
go api.startCacheRefreshLoop(opts.AuthenticatedCtx)
|
|
|
|
return api
|
|
}
|
|
|
|
func (a *API) Server(ctx context.Context) (*drpcserver.Server, error) {
|
|
mux := drpcmux.New()
|
|
err := agentproto.DRPCRegisterAgent(mux, a)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("register agent API protocol in DRPC mux: %w", err)
|
|
}
|
|
|
|
err = tailnetproto.DRPCRegisterTailnet(mux, a)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("register tailnet API protocol in DRPC mux: %w", err)
|
|
}
|
|
|
|
return drpcserver.NewWithOptions(&tracing.DRPCHandler{Handler: mux},
|
|
drpcserver.Options{
|
|
Manager: drpcsdk.DefaultDRPCOptions(nil),
|
|
Log: func(err error) {
|
|
if xerrors.Is(err, io.EOF) {
|
|
return
|
|
}
|
|
a.opts.Log.Debug(ctx, "drpc server error", slog.Error(err))
|
|
},
|
|
},
|
|
), nil
|
|
}
|
|
|
|
func (a *API) Serve(ctx context.Context, l net.Listener) error {
|
|
server, err := a.Server(ctx)
|
|
if err != nil {
|
|
return xerrors.Errorf("create agent API server: %w", err)
|
|
}
|
|
|
|
if err := a.ResourcesMonitoringAPI.InitMonitors(ctx); err != nil {
|
|
return xerrors.Errorf("initialize resource monitoring: %w", err)
|
|
}
|
|
|
|
return server.Serve(ctx, l)
|
|
}
|
|
|
|
func (a *API) agent(ctx context.Context) (database.WorkspaceAgent, error) {
|
|
agent, err := a.opts.Database.GetWorkspaceAgentByID(ctx, a.opts.AgentID)
|
|
if err != nil {
|
|
return database.WorkspaceAgent{}, xerrors.Errorf("get workspace agent by id %q: %w", a.opts.AgentID, err)
|
|
}
|
|
return agent, nil
|
|
}
|
|
|
|
// refreshCachedWorkspace periodically updates the cached workspace fields.
|
|
// This ensures that changes like prebuild claims (which modify owner_id, name, etc.)
|
|
// are eventually reflected in the cache without requiring agent reconnection.
|
|
func (a *API) refreshCachedWorkspace(ctx context.Context) {
|
|
ws, err := a.opts.Database.GetWorkspaceByID(ctx, a.opts.WorkspaceID)
|
|
if err != nil {
|
|
a.opts.Log.Warn(ctx, "failed to refresh cached workspace fields", slog.Error(err))
|
|
a.cachedWorkspaceFields.Clear()
|
|
return
|
|
}
|
|
|
|
if ws.IsPrebuild() {
|
|
return
|
|
}
|
|
|
|
// If we still have the same values, skip the update and logging calls.
|
|
if a.cachedWorkspaceFields.identity.Equal(database.WorkspaceIdentityFromWorkspace(ws)) {
|
|
return
|
|
}
|
|
// Update fields that can change during workspace lifecycle (e.g., AutostartSchedule)
|
|
a.cachedWorkspaceFields.UpdateValues(ws)
|
|
|
|
a.opts.Log.Debug(ctx, "refreshed cached workspace fields",
|
|
slog.F("workspace_id", ws.ID),
|
|
slog.F("owner_id", ws.OwnerID),
|
|
slog.F("name", ws.Name))
|
|
}
|
|
|
|
// startCacheRefreshLoop runs a background goroutine that periodically refreshes
|
|
// the cached workspace fields. This is primarily needed to handle prebuild claims
|
|
// where the owner_id and other fields change while the agent connection persists.
|
|
func (a *API) startCacheRefreshLoop(ctx context.Context) {
|
|
// Refresh every 5 minutes. This provides a reasonable balance between:
|
|
// - Keeping cache fresh for prebuild claims and other workspace updates
|
|
// - Minimizing unnecessary database queries
|
|
ticker := a.opts.Clock.TickerFunc(ctx, workspaceCacheRefreshInterval, func() error {
|
|
a.refreshCachedWorkspace(ctx)
|
|
return nil
|
|
}, "cache_refresh")
|
|
|
|
// We need to wait on the ticker exiting.
|
|
_ = ticker.Wait()
|
|
|
|
a.opts.Log.Debug(ctx, "cache refresh loop exited, invalidating the workspace cache on agent API",
|
|
slog.F("workspace_id", a.cachedWorkspaceFields.identity.ID),
|
|
slog.F("owner_id", a.cachedWorkspaceFields.identity.OwnerUsername),
|
|
slog.F("name", a.cachedWorkspaceFields.identity.Name))
|
|
a.cachedWorkspaceFields.Clear()
|
|
}
|
|
|
|
func (a *API) publishWorkspaceUpdate(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
|
|
a.opts.PublishWorkspaceUpdateFn(ctx, a.opts.OwnerID, wspubsub.WorkspaceEvent{
|
|
Kind: kind,
|
|
WorkspaceID: a.opts.WorkspaceID,
|
|
AgentID: &agent.ID,
|
|
})
|
|
return nil
|
|
}
|