Files
coder/coderd/agentapi/api.go
T
Callum Styan 45c43d4ec4 fix: refactor agent resource monitoring API to avoid excessive calls to DB (#20430)
This should resolve https://github.com/coder/internal/issues/728 by
refactoring the ResourceMonitorAPI struct to only require querying the
resource monitor once for memory and once for volumes, then using the
stored monitors on the API struct from that point on. This should
eliminate the vast majority of calls to `GetWorkspaceByAgentID` and
`FetchVolumesResourceMonitorsUpdatedAfter`/`FetchMemoryResourceMonitorsUpdatedAfter`
(millions of calls per week).

Tests passed, and I ran an instance of coder via a workspace with a
template that added resource monitoring every 10s. Note that this is the
default docker container, so there are other sources of
`GetWorkspaceByAgentID` db queries. Note that this workspace was running
for ~15 minutes at the time I gathered this data.

Over 30s for the `ResourceMonitor` calls:
```
coder@callum-coder-2:~/coder$ curl localhost:19090/metrics | grep ResourceMonitor | grep count
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0coderd_db_query_latencies_seconds_count{query="FetchMemoryResourceMonitorsByAgentID"} 2
coderd_db_query_latencies_seconds_count{query="FetchMemoryResourceMonitorsUpdatedAfter"} 2
100  288k    0  288k    0     0  58.3M      0 --:--:-- --:--:-- --:--:-- 70.4M
coderd_db_query_latencies_seconds_count{query="FetchVolumesResourceMonitorsByAgentID"} 2
coderd_db_query_latencies_seconds_count{query="FetchVolumesResourceMonitorsUpdatedAfter"} 2
coderd_db_query_latencies_seconds_count{query="UpdateMemoryResourceMonitor"} 155
coderd_db_query_latencies_seconds_count{query="UpdateVolumeResourceMonitor"} 155
coder@callum-coder-2:~/coder$ curl localhost:19090/metrics | grep ResourceMonitor | grep count
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0coderd_db_query_latencies_seconds_count{query="FetchMemoryResourceMonitorsByAgentID"} 2
coderd_db_query_latencies_seconds_count{query="FetchMemoryResourceMonitorsUpdatedAfter"} 2
100  288k    0  288k    0     0  34.7M      0 --:--:-- --:--:-- --:--:-- 40.2M
coderd_db_query_latencies_seconds_count{query="FetchVolumesResourceMonitorsByAgentID"} 2
coderd_db_query_latencies_seconds_count{query="FetchVolumesResourceMonitorsUpdatedAfter"} 2
coderd_db_query_latencies_seconds_count{query="UpdateMemoryResourceMonitor"} 158
coderd_db_query_latencies_seconds_count{query="UpdateVolumeResourceMonitor"} 158
```

And over 1m for the `GetWorkspaceAgentByID` calls, the majority are from
the workspace metadata stats updates:
```
coder@callum-coder-2:~/coder$ curl localhost:19090/metrics | grep GetWorkspaceByAgentID | grep count
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  284k    0  284k    0     0  42.4M      0 --:--:-- --:--:-- --:--:-- 46.3M
coderd_db_query_latencies_seconds_count{query="GetWorkspaceByAgentID"} 876
coder@callum-coder-2:~/coder$ curl localhost:19090/metrics | grep GetWorkspaceByAgentID | grep count
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  284k    0  284k    0     0  75.4M      0 --:--:-- --:--:-- --:--:-- 92.7M
coderd_db_query_latencies_seconds_count{query="GetWorkspaceByAgentID"} 918
```

---------

Signed-off-by: Callum Styan <callumstyan@gmail.com>
2025-10-28 13:38:16 -07:00

265 lines
7.8 KiB
Go

package agentapi
import (
"context"
"io"
"net"
"net/url"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
"tailscale.com/tailcfg"
"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor"
"github.com/coder/coder/v2/coderd/appearance"
"github.com/coder/coder/v2/coderd/connectionlog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/externalauth"
"github.com/coder/coder/v2/coderd/notifications"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/coderd/wspubsub"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/drpcsdk"
"github.com/coder/coder/v2/tailnet"
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/quartz"
)
// API implements the DRPC agent API interface from agent/proto. This struct is
// instantiated once per agent connection and kept alive for the duration of the
// session.
type API struct {
opts Options
*ManifestAPI
*AnnouncementBannerAPI
*StatsAPI
*LifecycleAPI
*AppsAPI
*MetadataAPI
*ResourcesMonitoringAPI
*LogsAPI
*ScriptsAPI
*ConnLogAPI
*SubAgentAPI
*tailnet.DRPCService
mu sync.Mutex
}
var _ agentproto.DRPCAgentServer = &API{}
type Options struct {
AgentID uuid.UUID
OwnerID uuid.UUID
WorkspaceID uuid.UUID
OrganizationID uuid.UUID
Ctx context.Context
Log slog.Logger
Clock quartz.Clock
Database database.Store
NotificationsEnqueuer notifications.Enqueuer
Pubsub pubsub.Pubsub
ConnectionLogger *atomic.Pointer[connectionlog.ConnectionLogger]
DerpMapFn func() *tailcfg.DERPMap
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
StatsReporter *workspacestats.Reporter
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
PublishWorkspaceUpdateFn func(ctx context.Context, userID uuid.UUID, event wspubsub.WorkspaceEvent)
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent)
AccessURL *url.URL
AppHostname string
AgentStatsRefreshInterval time.Duration
DisableDirectConnections bool
DerpForceWebSockets bool
DerpMapUpdateFrequency time.Duration
ExternalAuthConfigs []*externalauth.Config
Experiments codersdk.Experiments
UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric)
}
func New(opts Options) *API {
if opts.Clock == nil {
opts.Clock = quartz.NewReal()
}
api := &API{
opts: opts,
mu: sync.Mutex{},
}
api.ManifestAPI = &ManifestAPI{
AccessURL: opts.AccessURL,
AppHostname: opts.AppHostname,
ExternalAuthConfigs: opts.ExternalAuthConfigs,
DisableDirectConnections: opts.DisableDirectConnections,
DerpForceWebSockets: opts.DerpForceWebSockets,
AgentFn: api.agent,
Database: opts.Database,
DerpMapFn: opts.DerpMapFn,
WorkspaceID: opts.WorkspaceID,
}
api.AnnouncementBannerAPI = &AnnouncementBannerAPI{
appearanceFetcher: opts.AppearanceFetcher,
}
api.ResourcesMonitoringAPI = &ResourcesMonitoringAPI{
AgentID: opts.AgentID,
WorkspaceID: opts.WorkspaceID,
Clock: opts.Clock,
Database: opts.Database,
NotificationsEnqueuer: opts.NotificationsEnqueuer,
Debounce: 30 * time.Minute,
Config: resourcesmonitor.Config{
NumDatapoints: 20,
CollectionInterval: 10 * time.Second,
Alert: resourcesmonitor.AlertConfig{
MinimumNOKsPercent: 20,
ConsecutiveNOKsPercent: 50,
},
},
}
api.StatsAPI = &StatsAPI{
AgentFn: api.agent,
Database: opts.Database,
Log: opts.Log,
StatsReporter: opts.StatsReporter,
AgentStatsRefreshInterval: opts.AgentStatsRefreshInterval,
Experiments: opts.Experiments,
}
api.LifecycleAPI = &LifecycleAPI{
AgentFn: api.agent,
WorkspaceID: opts.WorkspaceID,
Database: opts.Database,
Log: opts.Log,
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
}
api.AppsAPI = &AppsAPI{
AgentFn: api.agent,
Database: opts.Database,
Log: opts.Log,
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
}
api.MetadataAPI = &MetadataAPI{
AgentFn: api.agent,
Database: opts.Database,
Pubsub: opts.Pubsub,
Log: opts.Log,
}
api.LogsAPI = &LogsAPI{
AgentFn: api.agent,
Database: opts.Database,
Log: opts.Log,
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
PublishWorkspaceAgentLogsUpdateFn: opts.PublishWorkspaceAgentLogsUpdateFn,
}
api.ScriptsAPI = &ScriptsAPI{
Database: opts.Database,
}
api.ConnLogAPI = &ConnLogAPI{
AgentFn: api.agent,
ConnectionLogger: opts.ConnectionLogger,
Database: opts.Database,
Log: opts.Log,
}
api.DRPCService = &tailnet.DRPCService{
CoordPtr: opts.TailnetCoordinator,
Logger: opts.Log,
DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency,
DerpMapFn: opts.DerpMapFn,
NetworkTelemetryHandler: opts.NetworkTelemetryHandler,
}
api.SubAgentAPI = &SubAgentAPI{
OwnerID: opts.OwnerID,
OrganizationID: opts.OrganizationID,
AgentID: opts.AgentID,
AgentFn: api.agent,
Log: opts.Log,
Clock: opts.Clock,
Database: opts.Database,
}
return api
}
func (a *API) Server(ctx context.Context) (*drpcserver.Server, error) {
mux := drpcmux.New()
err := agentproto.DRPCRegisterAgent(mux, a)
if err != nil {
return nil, xerrors.Errorf("register agent API protocol in DRPC mux: %w", err)
}
err = tailnetproto.DRPCRegisterTailnet(mux, a)
if err != nil {
return nil, xerrors.Errorf("register tailnet API protocol in DRPC mux: %w", err)
}
return drpcserver.NewWithOptions(&tracing.DRPCHandler{Handler: mux},
drpcserver.Options{
Manager: drpcsdk.DefaultDRPCOptions(nil),
Log: func(err error) {
if xerrors.Is(err, io.EOF) {
return
}
a.opts.Log.Debug(ctx, "drpc server error", slog.Error(err))
},
},
), nil
}
func (a *API) Serve(ctx context.Context, l net.Listener) error {
server, err := a.Server(ctx)
if err != nil {
return xerrors.Errorf("create agent API server: %w", err)
}
if err := a.ResourcesMonitoringAPI.InitMonitors(ctx); err != nil {
return xerrors.Errorf("initialize resource monitoring: %w", err)
}
return server.Serve(ctx, l)
}
func (a *API) agent(ctx context.Context) (database.WorkspaceAgent, error) {
agent, err := a.opts.Database.GetWorkspaceAgentByID(ctx, a.opts.AgentID)
if err != nil {
return database.WorkspaceAgent{}, xerrors.Errorf("get workspace agent by id %q: %w", a.opts.AgentID, err)
}
return agent, nil
}
func (a *API) publishWorkspaceUpdate(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
a.opts.PublishWorkspaceUpdateFn(ctx, a.opts.OwnerID, wspubsub.WorkspaceEvent{
Kind: kind,
WorkspaceID: a.opts.WorkspaceID,
AgentID: &agent.ID,
})
return nil
}