Files
coder/coderd/agentapi/resources_monitoring.go
Callum Styan 45c43d4ec4 fix: refactor agent resource monitoring API to avoid excessive calls to DB (#20430)
This should resolve https://github.com/coder/internal/issues/728 by
refactoring the ResourceMonitorAPI struct to only require querying the
resource monitor once for memory and once for volumes, then using the
stored monitors on the API struct from that point on. This should
eliminate the vast majority of calls to `GetWorkspaceByAgentID` and
`FetchVolumesResourceMonitorsUpdatedAfter`/`FetchMemoryResourceMonitorsUpdatedAfter`
(millions of calls per week).

Tests passed, and I ran an instance of coder via a workspace with a
template that added resource monitoring every 10s. Note that this is the
default docker container, so there are other sources of
`GetWorkspaceByAgentID` db queries. Note that this workspace was running
for ~15 minutes at the time I gathered this data.

Over 30s for the `ResourceMonitor` calls:
```
coder@callum-coder-2:~/coder$ curl localhost:19090/metrics | grep ResourceMonitor | grep count
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0coderd_db_query_latencies_seconds_count{query="FetchMemoryResourceMonitorsByAgentID"} 2
coderd_db_query_latencies_seconds_count{query="FetchMemoryResourceMonitorsUpdatedAfter"} 2
100  288k    0  288k    0     0  58.3M      0 --:--:-- --:--:-- --:--:-- 70.4M
coderd_db_query_latencies_seconds_count{query="FetchVolumesResourceMonitorsByAgentID"} 2
coderd_db_query_latencies_seconds_count{query="FetchVolumesResourceMonitorsUpdatedAfter"} 2
coderd_db_query_latencies_seconds_count{query="UpdateMemoryResourceMonitor"} 155
coderd_db_query_latencies_seconds_count{query="UpdateVolumeResourceMonitor"} 155
coder@callum-coder-2:~/coder$ curl localhost:19090/metrics | grep ResourceMonitor | grep count
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0coderd_db_query_latencies_seconds_count{query="FetchMemoryResourceMonitorsByAgentID"} 2
coderd_db_query_latencies_seconds_count{query="FetchMemoryResourceMonitorsUpdatedAfter"} 2
100  288k    0  288k    0     0  34.7M      0 --:--:-- --:--:-- --:--:-- 40.2M
coderd_db_query_latencies_seconds_count{query="FetchVolumesResourceMonitorsByAgentID"} 2
coderd_db_query_latencies_seconds_count{query="FetchVolumesResourceMonitorsUpdatedAfter"} 2
coderd_db_query_latencies_seconds_count{query="UpdateMemoryResourceMonitor"} 158
coderd_db_query_latencies_seconds_count{query="UpdateVolumeResourceMonitor"} 158
```

And over 1m for the `GetWorkspaceAgentByID` calls, the majority are from
the workspace metadata stats updates:
```
coder@callum-coder-2:~/coder$ curl localhost:19090/metrics | grep GetWorkspaceByAgentID | grep count
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  284k    0  284k    0     0  42.4M      0 --:--:-- --:--:-- --:--:-- 46.3M
coderd_db_query_latencies_seconds_count{query="GetWorkspaceByAgentID"} 876
coder@callum-coder-2:~/coder$ curl localhost:19090/metrics | grep GetWorkspaceByAgentID | grep count
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  284k    0  284k    0     0  75.4M      0 --:--:-- --:--:-- --:--:-- 92.7M
coderd_db_query_latencies_seconds_count{query="GetWorkspaceByAgentID"} 918
```

---------

Signed-off-by: Callum Styan <callumstyan@gmail.com>
2025-10-28 13:38:16 -07:00

280 lines
9.3 KiB
Go

package agentapi
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/google/uuid"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/notifications"
"github.com/coder/quartz"
)
type ResourcesMonitoringAPI struct {
AgentID uuid.UUID
WorkspaceID uuid.UUID
Log slog.Logger
Clock quartz.Clock
Database database.Store
NotificationsEnqueuer notifications.Enqueuer
Debounce time.Duration
Config resourcesmonitor.Config
// Cache resource monitors on first call to avoid millions of DB queries per day.
memoryMonitor database.WorkspaceAgentMemoryResourceMonitor
volumeMonitors []database.WorkspaceAgentVolumeResourceMonitor
monitorsLock sync.RWMutex
}
// InitMonitors fetches resource monitors from the database and caches them.
// This must be called once after creating a ResourcesMonitoringAPI, the context should be
// the agent per-RPC connection context. If fetching fails with a real error (not sql.ErrNoRows), the
// connection should be torn down.
func (a *ResourcesMonitoringAPI) InitMonitors(ctx context.Context) error {
memMon, err := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("fetch memory resource monitor: %w", err)
}
// If sql.ErrNoRows, memoryMonitor stays as zero value (CreatedAt.IsZero() = true).
// Otherwise, store the fetched monitor.
if err == nil {
a.memoryMonitor = memMon
}
volMons, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil {
return xerrors.Errorf("fetch volume resource monitors: %w", err)
}
// 0 length is valid, indicating none configured, since the volume monitors in the DB can be many.
a.volumeMonitors = volMons
return nil
}
func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(_ context.Context, _ *proto.GetResourcesMonitoringConfigurationRequest) (*proto.GetResourcesMonitoringConfigurationResponse, error) {
return &proto.GetResourcesMonitoringConfigurationResponse{
Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{
CollectionIntervalSeconds: int32(a.Config.CollectionInterval.Seconds()),
NumDatapoints: a.Config.NumDatapoints,
},
Memory: func() *proto.GetResourcesMonitoringConfigurationResponse_Memory {
if a.memoryMonitor.CreatedAt.IsZero() {
return nil
}
return &proto.GetResourcesMonitoringConfigurationResponse_Memory{
Enabled: a.memoryMonitor.Enabled,
}
}(),
Volumes: func() []*proto.GetResourcesMonitoringConfigurationResponse_Volume {
volumes := make([]*proto.GetResourcesMonitoringConfigurationResponse_Volume, 0, len(a.volumeMonitors))
for _, monitor := range a.volumeMonitors {
volumes = append(volumes, &proto.GetResourcesMonitoringConfigurationResponse_Volume{
Enabled: monitor.Enabled,
Path: monitor.Path,
})
}
return volumes
}(),
}, nil
}
func (a *ResourcesMonitoringAPI) PushResourcesMonitoringUsage(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
var err error
// Lock for the entire push operation since calls are sequential from the agent
a.monitorsLock.Lock()
defer a.monitorsLock.Unlock()
if memoryErr := a.monitorMemory(ctx, req.Datapoints); memoryErr != nil {
err = errors.Join(err, xerrors.Errorf("monitor memory: %w", memoryErr))
}
if volumeErr := a.monitorVolumes(ctx, req.Datapoints); volumeErr != nil {
err = errors.Join(err, xerrors.Errorf("monitor volume: %w", volumeErr))
}
return &proto.PushResourcesMonitoringUsageResponse{}, err
}
func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error {
if !a.memoryMonitor.Enabled {
return nil
}
usageDatapoints := make([]*proto.PushResourcesMonitoringUsageRequest_Datapoint_MemoryUsage, 0, len(datapoints))
for _, datapoint := range datapoints {
usageDatapoints = append(usageDatapoints, datapoint.Memory)
}
usageStates := resourcesmonitor.CalculateMemoryUsageStates(a.memoryMonitor, usageDatapoints)
oldState := a.memoryMonitor.State
newState := resourcesmonitor.NextState(a.Config, oldState, usageStates)
debouncedUntil, shouldNotify := a.memoryMonitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState)
//nolint:gocritic // We need to be able to update the resource monitor here.
err := a.Database.UpdateMemoryResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateMemoryResourceMonitorParams{
AgentID: a.AgentID,
State: newState,
UpdatedAt: dbtime.Time(a.Clock.Now()),
DebouncedUntil: dbtime.Time(debouncedUntil),
})
if err != nil {
return xerrors.Errorf("update workspace monitor: %w", err)
}
// Update cached state
a.memoryMonitor.State = newState
a.memoryMonitor.DebouncedUntil = dbtime.Time(debouncedUntil)
a.memoryMonitor.UpdatedAt = dbtime.Time(a.Clock.Now())
if !shouldNotify {
return nil
}
workspace, err := a.Database.GetWorkspaceByID(ctx, a.WorkspaceID)
if err != nil {
return xerrors.Errorf("get workspace by id: %w", err)
}
_, err = a.NotificationsEnqueuer.EnqueueWithData(
// nolint:gocritic // We need to be able to send the notification.
dbauthz.AsNotifier(ctx),
workspace.OwnerID,
notifications.TemplateWorkspaceOutOfMemory,
map[string]string{
"workspace": workspace.Name,
"threshold": fmt.Sprintf("%d%%", a.memoryMonitor.Threshold),
},
map[string]any{
// NOTE(DanielleMaywood):
// When notifications are enqueued, they are checked to be
// unique within a single day. This means that if we attempt
// to send two OOM notifications for the same workspace on
// the same day, the enqueuer will prevent us from sending
// a second one. We are inject a timestamp to make the
// notifications appear different enough to circumvent this
// deduplication logic.
"timestamp": a.Clock.Now(),
},
"workspace-monitor-memory",
workspace.ID,
workspace.OwnerID,
workspace.OrganizationID,
)
if err != nil {
return xerrors.Errorf("notify workspace OOM: %w", err)
}
return nil
}
func (a *ResourcesMonitoringAPI) monitorVolumes(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error {
outOfDiskVolumes := make([]map[string]any, 0)
for i, monitor := range a.volumeMonitors {
if !monitor.Enabled {
continue
}
usageDatapoints := make([]*proto.PushResourcesMonitoringUsageRequest_Datapoint_VolumeUsage, 0, len(datapoints))
for _, datapoint := range datapoints {
var usage *proto.PushResourcesMonitoringUsageRequest_Datapoint_VolumeUsage
for _, volume := range datapoint.Volumes {
if volume.Volume == monitor.Path {
usage = volume
break
}
}
usageDatapoints = append(usageDatapoints, usage)
}
usageStates := resourcesmonitor.CalculateVolumeUsageStates(monitor, usageDatapoints)
oldState := monitor.State
newState := resourcesmonitor.NextState(a.Config, oldState, usageStates)
debouncedUntil, shouldNotify := monitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState)
if shouldNotify {
outOfDiskVolumes = append(outOfDiskVolumes, map[string]any{
"path": monitor.Path,
"threshold": fmt.Sprintf("%d%%", monitor.Threshold),
})
}
//nolint:gocritic // We need to be able to update the resource monitor here.
if err := a.Database.UpdateVolumeResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateVolumeResourceMonitorParams{
AgentID: a.AgentID,
Path: monitor.Path,
State: newState,
UpdatedAt: dbtime.Time(a.Clock.Now()),
DebouncedUntil: dbtime.Time(debouncedUntil),
}); err != nil {
return xerrors.Errorf("update workspace monitor: %w", err)
}
// Update cached state
a.volumeMonitors[i].State = newState
a.volumeMonitors[i].DebouncedUntil = dbtime.Time(debouncedUntil)
a.volumeMonitors[i].UpdatedAt = dbtime.Time(a.Clock.Now())
}
if len(outOfDiskVolumes) == 0 {
return nil
}
workspace, err := a.Database.GetWorkspaceByID(ctx, a.WorkspaceID)
if err != nil {
return xerrors.Errorf("get workspace by id: %w", err)
}
if _, err := a.NotificationsEnqueuer.EnqueueWithData(
// nolint:gocritic // We need to be able to send the notification.
dbauthz.AsNotifier(ctx),
workspace.OwnerID,
notifications.TemplateWorkspaceOutOfDisk,
map[string]string{
"workspace": workspace.Name,
},
map[string]any{
"volumes": outOfDiskVolumes,
// NOTE(DanielleMaywood):
// When notifications are enqueued, they are checked to be
// unique within a single day. This means that if we attempt
// to send two OOM notifications for the same workspace on
// the same day, the enqueuer will prevent us from sending
// a second one. We are inject a timestamp to make the
// notifications appear different enough to circumvent this
// deduplication logic.
"timestamp": a.Clock.Now(),
},
"workspace-monitor-volumes",
workspace.ID,
workspace.OwnerID,
workspace.OrganizationID,
); err != nil {
return xerrors.Errorf("notify workspace OOD: %w", err)
}
return nil
}