fix: refactor agent resource monitoring API to avoid excessive calls to DB (#20430)

This should resolve https://github.com/coder/internal/issues/728 by
refactoring the ResourceMonitorAPI struct to only require querying the
resource monitor once for memory and once for volumes, then using the
stored monitors on the API struct from that point on. This should
eliminate the vast majority of calls to `GetWorkspaceByAgentID` and
`FetchVolumesResourceMonitorsUpdatedAfter`/`FetchMemoryResourceMonitorsUpdatedAfter`
(millions of calls per week).

Tests passed, and I ran an instance of coder via a workspace with a
template that added resource monitoring every 10s. Note that this is the
default docker container, so there are other sources of
`GetWorkspaceByAgentID` db queries. Note that this workspace was running
for ~15 minutes at the time I gathered this data.

Over 30s for the `ResourceMonitor` calls:
```
coder@callum-coder-2:~/coder$ curl localhost:19090/metrics | grep ResourceMonitor | grep count
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0coderd_db_query_latencies_seconds_count{query="FetchMemoryResourceMonitorsByAgentID"} 2
coderd_db_query_latencies_seconds_count{query="FetchMemoryResourceMonitorsUpdatedAfter"} 2
100  288k    0  288k    0     0  58.3M      0 --:--:-- --:--:-- --:--:-- 70.4M
coderd_db_query_latencies_seconds_count{query="FetchVolumesResourceMonitorsByAgentID"} 2
coderd_db_query_latencies_seconds_count{query="FetchVolumesResourceMonitorsUpdatedAfter"} 2
coderd_db_query_latencies_seconds_count{query="UpdateMemoryResourceMonitor"} 155
coderd_db_query_latencies_seconds_count{query="UpdateVolumeResourceMonitor"} 155
coder@callum-coder-2:~/coder$ curl localhost:19090/metrics | grep ResourceMonitor | grep count
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0coderd_db_query_latencies_seconds_count{query="FetchMemoryResourceMonitorsByAgentID"} 2
coderd_db_query_latencies_seconds_count{query="FetchMemoryResourceMonitorsUpdatedAfter"} 2
100  288k    0  288k    0     0  34.7M      0 --:--:-- --:--:-- --:--:-- 40.2M
coderd_db_query_latencies_seconds_count{query="FetchVolumesResourceMonitorsByAgentID"} 2
coderd_db_query_latencies_seconds_count{query="FetchVolumesResourceMonitorsUpdatedAfter"} 2
coderd_db_query_latencies_seconds_count{query="UpdateMemoryResourceMonitor"} 158
coderd_db_query_latencies_seconds_count{query="UpdateVolumeResourceMonitor"} 158
```

And over 1m for the `GetWorkspaceAgentByID` calls, the majority are from
the workspace metadata stats updates:
```
coder@callum-coder-2:~/coder$ curl localhost:19090/metrics | grep GetWorkspaceByAgentID | grep count
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  284k    0  284k    0     0  42.4M      0 --:--:-- --:--:-- --:--:-- 46.3M
coderd_db_query_latencies_seconds_count{query="GetWorkspaceByAgentID"} 876
coder@callum-coder-2:~/coder$ curl localhost:19090/metrics | grep GetWorkspaceByAgentID | grep count
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  284k    0  284k    0     0  75.4M      0 --:--:-- --:--:-- --:--:-- 92.7M
coderd_db_query_latencies_seconds_count{query="GetWorkspaceByAgentID"} 918
```

---------

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan
2025-10-28 13:38:16 -07:00
committed by GitHub
parent a1e7e105a4
commit 45c43d4ec4
3 changed files with 82 additions and 35 deletions
+4
View File
@@ -239,6 +239,10 @@ func (a *API) Serve(ctx context.Context, l net.Listener) error {
return xerrors.Errorf("create agent API server: %w", err)
}
if err := a.ResourcesMonitoringAPI.InitMonitors(ctx); err != nil {
return xerrors.Errorf("initialize resource monitoring: %w", err)
}
return server.Serve(ctx, l)
}
+52 -35
View File
@@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"sync"
"time"
"golang.org/x/xerrors"
@@ -33,42 +34,60 @@ type ResourcesMonitoringAPI struct {
Debounce time.Duration
Config resourcesmonitor.Config
// Cache resource monitors on first call to avoid millions of DB queries per day.
memoryMonitor database.WorkspaceAgentMemoryResourceMonitor
volumeMonitors []database.WorkspaceAgentVolumeResourceMonitor
monitorsLock sync.RWMutex
}
func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context.Context, _ *proto.GetResourcesMonitoringConfigurationRequest) (*proto.GetResourcesMonitoringConfigurationResponse, error) {
memoryMonitor, memoryErr := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID)
if memoryErr != nil && !errors.Is(memoryErr, sql.ErrNoRows) {
return nil, xerrors.Errorf("failed to fetch memory resource monitor: %w", memoryErr)
// InitMonitors fetches resource monitors from the database and caches them.
// This must be called once after creating a ResourcesMonitoringAPI, the context should be
// the agent per-RPC connection context. If fetching fails with a real error (not sql.ErrNoRows), the
// connection should be torn down.
func (a *ResourcesMonitoringAPI) InitMonitors(ctx context.Context) error {
memMon, err := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("fetch memory resource monitor: %w", err)
}
// If sql.ErrNoRows, memoryMonitor stays as zero value (CreatedAt.IsZero() = true).
// Otherwise, store the fetched monitor.
if err == nil {
a.memoryMonitor = memMon
}
volumeMonitors, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID)
volMons, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil {
return nil, xerrors.Errorf("failed to fetch volume resource monitors: %w", err)
return xerrors.Errorf("fetch volume resource monitors: %w", err)
}
// 0 length is valid, indicating none configured, since the volume monitors in the DB can be many.
a.volumeMonitors = volMons
return nil
}
func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(_ context.Context, _ *proto.GetResourcesMonitoringConfigurationRequest) (*proto.GetResourcesMonitoringConfigurationResponse, error) {
return &proto.GetResourcesMonitoringConfigurationResponse{
Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{
CollectionIntervalSeconds: int32(a.Config.CollectionInterval.Seconds()),
NumDatapoints: a.Config.NumDatapoints,
},
Memory: func() *proto.GetResourcesMonitoringConfigurationResponse_Memory {
if memoryErr != nil {
if a.memoryMonitor.CreatedAt.IsZero() {
return nil
}
return &proto.GetResourcesMonitoringConfigurationResponse_Memory{
Enabled: memoryMonitor.Enabled,
Enabled: a.memoryMonitor.Enabled,
}
}(),
Volumes: func() []*proto.GetResourcesMonitoringConfigurationResponse_Volume {
volumes := make([]*proto.GetResourcesMonitoringConfigurationResponse_Volume, 0, len(volumeMonitors))
for _, monitor := range volumeMonitors {
volumes := make([]*proto.GetResourcesMonitoringConfigurationResponse_Volume, 0, len(a.volumeMonitors))
for _, monitor := range a.volumeMonitors {
volumes = append(volumes, &proto.GetResourcesMonitoringConfigurationResponse_Volume{
Enabled: monitor.Enabled,
Path: monitor.Path,
})
}
return volumes
}(),
}, nil
@@ -77,6 +96,10 @@ func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context
func (a *ResourcesMonitoringAPI) PushResourcesMonitoringUsage(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
var err error
// Lock for the entire push operation since calls are sequential from the agent
a.monitorsLock.Lock()
defer a.monitorsLock.Unlock()
if memoryErr := a.monitorMemory(ctx, req.Datapoints); memoryErr != nil {
err = errors.Join(err, xerrors.Errorf("monitor memory: %w", memoryErr))
}
@@ -89,18 +112,7 @@ func (a *ResourcesMonitoringAPI) PushResourcesMonitoringUsage(ctx context.Contex
}
func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error {
monitor, err := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil {
// It is valid for an agent to not have a memory monitor, so we
// do not want to treat it as an error.
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return xerrors.Errorf("fetch memory resource monitor: %w", err)
}
if !monitor.Enabled {
if !a.memoryMonitor.Enabled {
return nil
}
@@ -109,15 +121,15 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [
usageDatapoints = append(usageDatapoints, datapoint.Memory)
}
usageStates := resourcesmonitor.CalculateMemoryUsageStates(monitor, usageDatapoints)
usageStates := resourcesmonitor.CalculateMemoryUsageStates(a.memoryMonitor, usageDatapoints)
oldState := monitor.State
oldState := a.memoryMonitor.State
newState := resourcesmonitor.NextState(a.Config, oldState, usageStates)
debouncedUntil, shouldNotify := monitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState)
debouncedUntil, shouldNotify := a.memoryMonitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState)
//nolint:gocritic // We need to be able to update the resource monitor here.
err = a.Database.UpdateMemoryResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateMemoryResourceMonitorParams{
err := a.Database.UpdateMemoryResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateMemoryResourceMonitorParams{
AgentID: a.AgentID,
State: newState,
UpdatedAt: dbtime.Time(a.Clock.Now()),
@@ -127,6 +139,11 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [
return xerrors.Errorf("update workspace monitor: %w", err)
}
// Update cached state
a.memoryMonitor.State = newState
a.memoryMonitor.DebouncedUntil = dbtime.Time(debouncedUntil)
a.memoryMonitor.UpdatedAt = dbtime.Time(a.Clock.Now())
if !shouldNotify {
return nil
}
@@ -143,7 +160,7 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [
notifications.TemplateWorkspaceOutOfMemory,
map[string]string{
"workspace": workspace.Name,
"threshold": fmt.Sprintf("%d%%", monitor.Threshold),
"threshold": fmt.Sprintf("%d%%", a.memoryMonitor.Threshold),
},
map[string]any{
// NOTE(DanielleMaywood):
@@ -169,14 +186,9 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [
}
func (a *ResourcesMonitoringAPI) monitorVolumes(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error {
volumeMonitors, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil {
return xerrors.Errorf("get or insert volume monitor: %w", err)
}
outOfDiskVolumes := make([]map[string]any, 0)
for _, monitor := range volumeMonitors {
for i, monitor := range a.volumeMonitors {
if !monitor.Enabled {
continue
}
@@ -219,6 +231,11 @@ func (a *ResourcesMonitoringAPI) monitorVolumes(ctx context.Context, datapoints
}); err != nil {
return xerrors.Errorf("update workspace monitor: %w", err)
}
// Update cached state
a.volumeMonitors[i].State = newState
a.volumeMonitors[i].DebouncedUntil = dbtime.Time(debouncedUntil)
a.volumeMonitors[i].UpdatedAt = dbtime.Time(a.Clock.Now())
}
if len(outOfDiskVolumes) == 0 {
@@ -101,6 +101,9 @@ func TestMemoryResourceMonitorDebounce(t *testing.T) {
Threshold: 80,
})
// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))
// When: The monitor is given a state that will trigger NOK
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
@@ -304,6 +307,9 @@ func TestMemoryResourceMonitor(t *testing.T) {
Threshold: 80,
})
// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))
clock.Set(collectedAt)
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: datapoints,
@@ -337,6 +343,8 @@ func TestMemoryResourceMonitorMissingData(t *testing.T) {
State: database.WorkspaceAgentMonitorStateOK,
Threshold: 80,
})
// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))
// When: A datapoint is missing, surrounded by two NOK datapoints.
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
@@ -387,6 +395,9 @@ func TestMemoryResourceMonitorMissingData(t *testing.T) {
Threshold: 80,
})
// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))
// When: A datapoint is missing, surrounded by two OK datapoints.
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
@@ -466,6 +477,9 @@ func TestVolumeResourceMonitorDebounce(t *testing.T) {
Threshold: 80,
})
// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))
// When:
// - First monitor is in a NOK state
// - Second monitor is in an OK state
@@ -742,6 +756,9 @@ func TestVolumeResourceMonitor(t *testing.T) {
Threshold: tt.thresholdPercent,
})
// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))
clock.Set(collectedAt)
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: datapoints,
@@ -780,6 +797,9 @@ func TestVolumeResourceMonitorMultiple(t *testing.T) {
Threshold: 80,
})
// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))
// When: both of them move to a NOK state
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
@@ -832,6 +852,9 @@ func TestVolumeResourceMonitorMissingData(t *testing.T) {
Threshold: 80,
})
// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))
// When: A datapoint is missing, surrounded by two NOK datapoints.
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
@@ -891,6 +914,9 @@ func TestVolumeResourceMonitorMissingData(t *testing.T) {
Threshold: 80,
})
// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))
// When: A datapoint is missing, surrounded by two OK datapoints.
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{