feat: integrate agentAPI with resources monitoring logic (#16438)

As part of the new resources monitoring logic - more specifically for
OOM & OOD Notifications , we need to update the AgentAPI , and the
agents logic.

This PR aims to do it, and more specifically :  
We are updating the AgentAPI & TailnetAPI to version 24 to add two new
methods in the AgentAPI :
- One method to fetch the resources monitoring configuration
- One method to push the datapoints for the resources monitoring.

Also, this PR adds a new logic on the agent side, with a routine running
and ticking - fetching the resources usage each time , but also storing
it in a FIFO like queue.

Finally, this PR fixes a problem we had with RBAC logic on the resources
monitoring model, applying the same logic than we have for similar
entities.
This commit is contained in:
Vincent Vielle
2025-02-14 10:28:15 +01:00
committed by GitHub
parent edd982e852
commit bc609d0056
19 changed files with 1830 additions and 218 deletions
+45 -23
View File
@@ -38,8 +38,10 @@ import (
"github.com/coder/coder/v2/agent/agentscripts" "github.com/coder/coder/v2/agent/agentscripts"
"github.com/coder/coder/v2/agent/agentssh" "github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
"github.com/coder/coder/v2/agent/reconnectingpty" "github.com/coder/coder/v2/agent/reconnectingpty"
"github.com/coder/coder/v2/buildinfo" "github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/cli/clistat"
"github.com/coder/coder/v2/cli/gitauth" "github.com/coder/coder/v2/cli/gitauth"
"github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk"
@@ -47,6 +49,7 @@ import (
"github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet"
tailnetproto "github.com/coder/coder/v2/tailnet/proto" tailnetproto "github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/quartz"
"github.com/coder/retry" "github.com/coder/retry"
) )
@@ -87,8 +90,8 @@ type Options struct {
} }
type Client interface { type Client interface {
ConnectRPC23(ctx context.Context) ( ConnectRPC24(ctx context.Context) (
proto.DRPCAgentClient23, tailnetproto.DRPCTailnetClient23, error, proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient24, error,
) )
RewriteDERPMap(derpMap *tailcfg.DERPMap) RewriteDERPMap(derpMap *tailcfg.DERPMap)
} }
@@ -406,7 +409,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
fn() fn()
} }
func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23) error { func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
tickerDone := make(chan struct{}) tickerDone := make(chan struct{})
collectDone := make(chan struct{}) collectDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
@@ -622,7 +625,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23
// reportLifecycle reports the current lifecycle state once. All state // reportLifecycle reports the current lifecycle state once. All state
// changes are reported in order. // changes are reported in order.
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient23) error { func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
for { for {
select { select {
case <-a.lifecycleUpdate: case <-a.lifecycleUpdate:
@@ -704,7 +707,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
// fetchServiceBannerLoop fetches the service banner on an interval. It will // fetchServiceBannerLoop fetches the service banner on an interval. It will
// not be fetched immediately; the expectation is that it is primed elsewhere // not be fetched immediately; the expectation is that it is primed elsewhere
// (and must be done before the session actually starts). // (and must be done before the session actually starts).
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient23) error { func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
ticker := time.NewTicker(a.announcementBannersRefreshInterval) ticker := time.NewTicker(a.announcementBannersRefreshInterval)
defer ticker.Stop() defer ticker.Stop()
for { for {
@@ -740,7 +743,7 @@ func (a *agent) run() (retErr error) {
a.sessionToken.Store(&sessionToken) a.sessionToken.Store(&sessionToken)
// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs // ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
aAPI, tAPI, err := a.client.ConnectRPC23(a.hardCtx) aAPI, tAPI, err := a.client.ConnectRPC24(a.hardCtx)
if err != nil { if err != nil {
return err return err
} }
@@ -757,7 +760,7 @@ func (a *agent) run() (retErr error) {
connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, aAPI, tAPI) connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, aAPI, tAPI)
connMan.startAgentAPI("init notification banners", gracefulShutdownBehaviorStop, connMan.startAgentAPI("init notification banners", gracefulShutdownBehaviorStop,
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error { func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
bannersProto, err := aAPI.GetAnnouncementBanners(ctx, &proto.GetAnnouncementBannersRequest{}) bannersProto, err := aAPI.GetAnnouncementBanners(ctx, &proto.GetAnnouncementBannersRequest{})
if err != nil { if err != nil {
return xerrors.Errorf("fetch service banner: %w", err) return xerrors.Errorf("fetch service banner: %w", err)
@@ -774,7 +777,7 @@ func (a *agent) run() (retErr error) {
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by // sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
// shutdown scripts. // shutdown scripts.
connMan.startAgentAPI("send logs", gracefulShutdownBehaviorRemain, connMan.startAgentAPI("send logs", gracefulShutdownBehaviorRemain,
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error { func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
err := a.logSender.SendLoop(ctx, aAPI) err := a.logSender.SendLoop(ctx, aAPI)
if xerrors.Is(err, agentsdk.LogLimitExceededError) { if xerrors.Is(err, agentsdk.LogLimitExceededError) {
// we don't want this error to tear down the API connection and propagate to the // we don't want this error to tear down the API connection and propagate to the
@@ -792,6 +795,25 @@ func (a *agent) run() (retErr error) {
// metadata reporting can cease as soon as we start gracefully shutting down // metadata reporting can cease as soon as we start gracefully shutting down
connMan.startAgentAPI("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata) connMan.startAgentAPI("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata)
// resources monitor can cease as soon as we start gracefully shutting down.
connMan.startAgentAPI("resources monitor", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
logger := a.logger.Named("resources_monitor")
clk := quartz.NewReal()
config, err := aAPI.GetResourcesMonitoringConfiguration(ctx, &proto.GetResourcesMonitoringConfigurationRequest{})
if err != nil {
return xerrors.Errorf("failed to get resources monitoring configuration: %w", err)
}
statfetcher, err := clistat.New()
if err != nil {
return xerrors.Errorf("failed to create resources fetcher: %w", err)
}
resourcesFetcher := resourcesmonitor.NewFetcher(statfetcher)
resourcesmonitor := resourcesmonitor.NewResourcesMonitor(logger, clk, config, resourcesFetcher, aAPI)
return resourcesmonitor.Start(ctx)
})
// channels to sync goroutines below // channels to sync goroutines below
// handle manifest // handle manifest
// | // |
@@ -814,7 +836,7 @@ func (a *agent) run() (retErr error) {
connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK)) connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))
connMan.startAgentAPI("app health reporter", gracefulShutdownBehaviorStop, connMan.startAgentAPI("app health reporter", gracefulShutdownBehaviorStop,
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error { func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
if err := manifestOK.wait(ctx); err != nil { if err := manifestOK.wait(ctx); err != nil {
return xerrors.Errorf("no manifest: %w", err) return xerrors.Errorf("no manifest: %w", err)
} }
@@ -829,7 +851,7 @@ func (a *agent) run() (retErr error) {
a.createOrUpdateNetwork(manifestOK, networkOK)) a.createOrUpdateNetwork(manifestOK, networkOK))
connMan.startTailnetAPI("coordination", gracefulShutdownBehaviorStop, connMan.startTailnetAPI("coordination", gracefulShutdownBehaviorStop,
func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient23) error { func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient24) error {
if err := networkOK.wait(ctx); err != nil { if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err) return xerrors.Errorf("no network: %w", err)
} }
@@ -838,7 +860,7 @@ func (a *agent) run() (retErr error) {
) )
connMan.startTailnetAPI("derp map subscriber", gracefulShutdownBehaviorStop, connMan.startTailnetAPI("derp map subscriber", gracefulShutdownBehaviorStop,
func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient23) error { func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient24) error {
if err := networkOK.wait(ctx); err != nil { if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err) return xerrors.Errorf("no network: %w", err)
} }
@@ -847,7 +869,7 @@ func (a *agent) run() (retErr error) {
connMan.startAgentAPI("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop) connMan.startAgentAPI("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)
connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient23) error { connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
if err := networkOK.wait(ctx); err != nil { if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err) return xerrors.Errorf("no network: %w", err)
} }
@@ -858,8 +880,8 @@ func (a *agent) run() (retErr error) {
} }
// handleManifest returns a function that fetches and processes the manifest // handleManifest returns a function that fetches and processes the manifest
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient23) error { func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
return func(ctx context.Context, aAPI proto.DRPCAgentClient23) error { return func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
var ( var (
sentResult = false sentResult = false
err error err error
@@ -968,8 +990,8 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates // createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
// the tailnet using the information in the manifest // the tailnet using the information in the manifest
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient23) error { func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient24) error {
return func(ctx context.Context, _ proto.DRPCAgentClient23) (retErr error) { return func(ctx context.Context, _ proto.DRPCAgentClient24) (retErr error) {
if err := manifestOK.wait(ctx); err != nil { if err := manifestOK.wait(ctx); err != nil {
return xerrors.Errorf("no manifest: %w", err) return xerrors.Errorf("no manifest: %w", err)
} }
@@ -1273,7 +1295,7 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t
// runCoordinator runs a coordinator and returns whether a reconnect // runCoordinator runs a coordinator and returns whether a reconnect
// should occur. // should occur.
func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTailnetClient23, network *tailnet.Conn) error { func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTailnetClient24, network *tailnet.Conn) error {
defer a.logger.Debug(ctx, "disconnected from coordination RPC") defer a.logger.Debug(ctx, "disconnected from coordination RPC")
// we run the RPC on the hardCtx so that we have a chance to send the disconnect message if we // we run the RPC on the hardCtx so that we have a chance to send the disconnect message if we
// gracefully shut down. // gracefully shut down.
@@ -1320,7 +1342,7 @@ func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTai
} }
// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur. // runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
func (a *agent) runDERPMapSubscriber(ctx context.Context, tClient tailnetproto.DRPCTailnetClient23, network *tailnet.Conn) error { func (a *agent) runDERPMapSubscriber(ctx context.Context, tClient tailnetproto.DRPCTailnetClient24, network *tailnet.Conn) error {
defer a.logger.Debug(ctx, "disconnected from derp map RPC") defer a.logger.Debug(ctx, "disconnected from derp map RPC")
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@@ -1690,8 +1712,8 @@ const (
type apiConnRoutineManager struct { type apiConnRoutineManager struct {
logger slog.Logger logger slog.Logger
aAPI proto.DRPCAgentClient23 aAPI proto.DRPCAgentClient24
tAPI tailnetproto.DRPCTailnetClient23 tAPI tailnetproto.DRPCTailnetClient24
eg *errgroup.Group eg *errgroup.Group
stopCtx context.Context stopCtx context.Context
remainCtx context.Context remainCtx context.Context
@@ -1699,7 +1721,7 @@ type apiConnRoutineManager struct {
func newAPIConnRoutineManager( func newAPIConnRoutineManager(
gracefulCtx, hardCtx context.Context, logger slog.Logger, gracefulCtx, hardCtx context.Context, logger slog.Logger,
aAPI proto.DRPCAgentClient23, tAPI tailnetproto.DRPCTailnetClient23, aAPI proto.DRPCAgentClient24, tAPI tailnetproto.DRPCTailnetClient24,
) *apiConnRoutineManager { ) *apiConnRoutineManager {
// routines that remain in operation during graceful shutdown use the remainCtx. They'll still // routines that remain in operation during graceful shutdown use the remainCtx. They'll still
// exit if the errgroup hits an error, which usually means a problem with the conn. // exit if the errgroup hits an error, which usually means a problem with the conn.
@@ -1732,7 +1754,7 @@ func newAPIConnRoutineManager(
// but for Tailnet. // but for Tailnet.
func (a *apiConnRoutineManager) startAgentAPI( func (a *apiConnRoutineManager) startAgentAPI(
name string, behavior gracefulShutdownBehavior, name string, behavior gracefulShutdownBehavior,
f func(context.Context, proto.DRPCAgentClient23) error, f func(context.Context, proto.DRPCAgentClient24) error,
) { ) {
logger := a.logger.With(slog.F("name", name)) logger := a.logger.With(slog.F("name", name))
var ctx context.Context var ctx context.Context
@@ -1769,7 +1791,7 @@ func (a *apiConnRoutineManager) startAgentAPI(
// but for the Agent API. // but for the Agent API.
func (a *apiConnRoutineManager) startTailnetAPI( func (a *apiConnRoutineManager) startTailnetAPI(
name string, behavior gracefulShutdownBehavior, name string, behavior gracefulShutdownBehavior,
f func(context.Context, tailnetproto.DRPCTailnetClient23) error, f func(context.Context, tailnetproto.DRPCTailnetClient24) error,
) { ) {
logger := a.logger.With(slog.F("name", name)) logger := a.logger.With(slog.F("name", name))
var ctx context.Context var ctx context.Context
+32 -3
View File
@@ -96,8 +96,8 @@ func (c *Client) Close() {
c.derpMapOnce.Do(func() { close(c.derpMapUpdates) }) c.derpMapOnce.Do(func() { close(c.derpMapUpdates) })
} }
func (c *Client) ConnectRPC23(ctx context.Context) ( func (c *Client) ConnectRPC24(ctx context.Context) (
agentproto.DRPCAgentClient23, proto.DRPCTailnetClient23, error, agentproto.DRPCAgentClient24, proto.DRPCTailnetClient24, error,
) { ) {
conn, lis := drpcsdk.MemTransportPipe() conn, lis := drpcsdk.MemTransportPipe()
c.LastWorkspaceAgent = func() { c.LastWorkspaceAgent = func() {
@@ -171,7 +171,9 @@ type FakeAgentAPI struct {
metadata map[string]agentsdk.Metadata metadata map[string]agentsdk.Metadata
timings []*agentproto.Timing timings []*agentproto.Timing
getAnnouncementBannersFunc func() ([]codersdk.BannerConfig, error) getAnnouncementBannersFunc func() ([]codersdk.BannerConfig, error)
getResourcesMonitoringConfigurationFunc func() (*agentproto.GetResourcesMonitoringConfigurationResponse, error)
pushResourcesMonitoringUsageFunc func(*agentproto.PushResourcesMonitoringUsageRequest) (*agentproto.PushResourcesMonitoringUsageResponse, error)
} }
func (f *FakeAgentAPI) GetManifest(context.Context, *agentproto.GetManifestRequest) (*agentproto.Manifest, error) { func (f *FakeAgentAPI) GetManifest(context.Context, *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {
@@ -212,6 +214,33 @@ func (f *FakeAgentAPI) GetAnnouncementBanners(context.Context, *agentproto.GetAn
return &agentproto.GetAnnouncementBannersResponse{AnnouncementBanners: bannersProto}, nil return &agentproto.GetAnnouncementBannersResponse{AnnouncementBanners: bannersProto}, nil
} }
func (f *FakeAgentAPI) GetResourcesMonitoringConfiguration(_ context.Context, _ *agentproto.GetResourcesMonitoringConfigurationRequest) (*agentproto.GetResourcesMonitoringConfigurationResponse, error) {
f.Lock()
defer f.Unlock()
if f.getResourcesMonitoringConfigurationFunc == nil {
return &agentproto.GetResourcesMonitoringConfigurationResponse{
Config: &agentproto.GetResourcesMonitoringConfigurationResponse_Config{
CollectionIntervalSeconds: 10,
NumDatapoints: 20,
},
}, nil
}
return f.getResourcesMonitoringConfigurationFunc()
}
func (f *FakeAgentAPI) PushResourcesMonitoringUsage(_ context.Context, req *agentproto.PushResourcesMonitoringUsageRequest) (*agentproto.PushResourcesMonitoringUsageResponse, error) {
f.Lock()
defer f.Unlock()
if f.pushResourcesMonitoringUsageFunc == nil {
return &agentproto.PushResourcesMonitoringUsageResponse{}, nil
}
return f.pushResourcesMonitoringUsageFunc(req)
}
func (f *FakeAgentAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsRequest) (*agentproto.UpdateStatsResponse, error) { func (f *FakeAgentAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsRequest) (*agentproto.UpdateStatsResponse, error) {
f.logger.Debug(ctx, "update stats called", slog.F("req", req)) f.logger.Debug(ctx, "update stats called", slog.F("req", req))
// empty request is sent to get the interval; but our tests don't want empty stats requests // empty request is sent to get the interval; but our tests don't want empty stats requests
+946 -185
View File
File diff suppressed because it is too large Load Diff
+47
View File
@@ -295,6 +295,51 @@ message Timing {
Status status = 6; Status status = 6;
} }
message GetResourcesMonitoringConfigurationRequest {
}
message GetResourcesMonitoringConfigurationResponse {
message Config {
int32 num_datapoints = 1;
int32 collection_interval_seconds = 2;
}
Config config = 1;
message Memory {
bool enabled = 1;
}
optional Memory memory = 2;
message Volume {
bool enabled = 1;
string path = 2;
}
repeated Volume volumes = 3;
}
message PushResourcesMonitoringUsageRequest {
message Datapoint {
message MemoryUsage {
int64 used = 1;
int64 total = 2;
}
message VolumeUsage {
string volume = 1;
int64 used = 2;
int64 total = 3;
}
google.protobuf.Timestamp collected_at = 1;
optional MemoryUsage memory = 2;
repeated VolumeUsage volumes = 3;
}
repeated Datapoint datapoints = 1;
}
message PushResourcesMonitoringUsageResponse {
}
service Agent { service Agent {
rpc GetManifest(GetManifestRequest) returns (Manifest); rpc GetManifest(GetManifestRequest) returns (Manifest);
rpc GetServiceBanner(GetServiceBannerRequest) returns (ServiceBanner); rpc GetServiceBanner(GetServiceBannerRequest) returns (ServiceBanner);
@@ -306,4 +351,6 @@ service Agent {
rpc BatchCreateLogs(BatchCreateLogsRequest) returns (BatchCreateLogsResponse); rpc BatchCreateLogs(BatchCreateLogsRequest) returns (BatchCreateLogsResponse);
rpc GetAnnouncementBanners(GetAnnouncementBannersRequest) returns (GetAnnouncementBannersResponse); rpc GetAnnouncementBanners(GetAnnouncementBannersRequest) returns (GetAnnouncementBannersResponse);
rpc ScriptCompleted(WorkspaceAgentScriptCompletedRequest) returns (WorkspaceAgentScriptCompletedResponse); rpc ScriptCompleted(WorkspaceAgentScriptCompletedRequest) returns (WorkspaceAgentScriptCompletedResponse);
rpc GetResourcesMonitoringConfiguration(GetResourcesMonitoringConfigurationRequest) returns (GetResourcesMonitoringConfigurationResponse);
rpc PushResourcesMonitoringUsage(PushResourcesMonitoringUsageRequest) returns (PushResourcesMonitoringUsageResponse);
} }
+81 -1
View File
@@ -48,6 +48,8 @@ type DRPCAgentClient interface {
BatchCreateLogs(ctx context.Context, in *BatchCreateLogsRequest) (*BatchCreateLogsResponse, error) BatchCreateLogs(ctx context.Context, in *BatchCreateLogsRequest) (*BatchCreateLogsResponse, error)
GetAnnouncementBanners(ctx context.Context, in *GetAnnouncementBannersRequest) (*GetAnnouncementBannersResponse, error) GetAnnouncementBanners(ctx context.Context, in *GetAnnouncementBannersRequest) (*GetAnnouncementBannersResponse, error)
ScriptCompleted(ctx context.Context, in *WorkspaceAgentScriptCompletedRequest) (*WorkspaceAgentScriptCompletedResponse, error) ScriptCompleted(ctx context.Context, in *WorkspaceAgentScriptCompletedRequest) (*WorkspaceAgentScriptCompletedResponse, error)
GetResourcesMonitoringConfiguration(ctx context.Context, in *GetResourcesMonitoringConfigurationRequest) (*GetResourcesMonitoringConfigurationResponse, error)
PushResourcesMonitoringUsage(ctx context.Context, in *PushResourcesMonitoringUsageRequest) (*PushResourcesMonitoringUsageResponse, error)
} }
type drpcAgentClient struct { type drpcAgentClient struct {
@@ -150,6 +152,24 @@ func (c *drpcAgentClient) ScriptCompleted(ctx context.Context, in *WorkspaceAgen
return out, nil return out, nil
} }
func (c *drpcAgentClient) GetResourcesMonitoringConfiguration(ctx context.Context, in *GetResourcesMonitoringConfigurationRequest) (*GetResourcesMonitoringConfigurationResponse, error) {
out := new(GetResourcesMonitoringConfigurationResponse)
err := c.cc.Invoke(ctx, "/coder.agent.v2.Agent/GetResourcesMonitoringConfiguration", drpcEncoding_File_agent_proto_agent_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcAgentClient) PushResourcesMonitoringUsage(ctx context.Context, in *PushResourcesMonitoringUsageRequest) (*PushResourcesMonitoringUsageResponse, error) {
out := new(PushResourcesMonitoringUsageResponse)
err := c.cc.Invoke(ctx, "/coder.agent.v2.Agent/PushResourcesMonitoringUsage", drpcEncoding_File_agent_proto_agent_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
type DRPCAgentServer interface { type DRPCAgentServer interface {
GetManifest(context.Context, *GetManifestRequest) (*Manifest, error) GetManifest(context.Context, *GetManifestRequest) (*Manifest, error)
GetServiceBanner(context.Context, *GetServiceBannerRequest) (*ServiceBanner, error) GetServiceBanner(context.Context, *GetServiceBannerRequest) (*ServiceBanner, error)
@@ -161,6 +181,8 @@ type DRPCAgentServer interface {
BatchCreateLogs(context.Context, *BatchCreateLogsRequest) (*BatchCreateLogsResponse, error) BatchCreateLogs(context.Context, *BatchCreateLogsRequest) (*BatchCreateLogsResponse, error)
GetAnnouncementBanners(context.Context, *GetAnnouncementBannersRequest) (*GetAnnouncementBannersResponse, error) GetAnnouncementBanners(context.Context, *GetAnnouncementBannersRequest) (*GetAnnouncementBannersResponse, error)
ScriptCompleted(context.Context, *WorkspaceAgentScriptCompletedRequest) (*WorkspaceAgentScriptCompletedResponse, error) ScriptCompleted(context.Context, *WorkspaceAgentScriptCompletedRequest) (*WorkspaceAgentScriptCompletedResponse, error)
GetResourcesMonitoringConfiguration(context.Context, *GetResourcesMonitoringConfigurationRequest) (*GetResourcesMonitoringConfigurationResponse, error)
PushResourcesMonitoringUsage(context.Context, *PushResourcesMonitoringUsageRequest) (*PushResourcesMonitoringUsageResponse, error)
} }
type DRPCAgentUnimplementedServer struct{} type DRPCAgentUnimplementedServer struct{}
@@ -205,9 +227,17 @@ func (s *DRPCAgentUnimplementedServer) ScriptCompleted(context.Context, *Workspa
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
} }
func (s *DRPCAgentUnimplementedServer) GetResourcesMonitoringConfiguration(context.Context, *GetResourcesMonitoringConfigurationRequest) (*GetResourcesMonitoringConfigurationResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCAgentUnimplementedServer) PushResourcesMonitoringUsage(context.Context, *PushResourcesMonitoringUsageRequest) (*PushResourcesMonitoringUsageResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
type DRPCAgentDescription struct{} type DRPCAgentDescription struct{}
func (DRPCAgentDescription) NumMethods() int { return 10 } func (DRPCAgentDescription) NumMethods() int { return 12 }
func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) { func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
switch n { switch n {
@@ -301,6 +331,24 @@ func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver,
in1.(*WorkspaceAgentScriptCompletedRequest), in1.(*WorkspaceAgentScriptCompletedRequest),
) )
}, DRPCAgentServer.ScriptCompleted, true }, DRPCAgentServer.ScriptCompleted, true
case 10:
return "/coder.agent.v2.Agent/GetResourcesMonitoringConfiguration", drpcEncoding_File_agent_proto_agent_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCAgentServer).
GetResourcesMonitoringConfiguration(
ctx,
in1.(*GetResourcesMonitoringConfigurationRequest),
)
}, DRPCAgentServer.GetResourcesMonitoringConfiguration, true
case 11:
return "/coder.agent.v2.Agent/PushResourcesMonitoringUsage", drpcEncoding_File_agent_proto_agent_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCAgentServer).
PushResourcesMonitoringUsage(
ctx,
in1.(*PushResourcesMonitoringUsageRequest),
)
}, DRPCAgentServer.PushResourcesMonitoringUsage, true
default: default:
return "", nil, nil, nil, false return "", nil, nil, nil, false
} }
@@ -469,3 +517,35 @@ func (x *drpcAgent_ScriptCompletedStream) SendAndClose(m *WorkspaceAgentScriptCo
} }
return x.CloseSend() return x.CloseSend()
} }
type DRPCAgent_GetResourcesMonitoringConfigurationStream interface {
drpc.Stream
SendAndClose(*GetResourcesMonitoringConfigurationResponse) error
}
type drpcAgent_GetResourcesMonitoringConfigurationStream struct {
drpc.Stream
}
func (x *drpcAgent_GetResourcesMonitoringConfigurationStream) SendAndClose(m *GetResourcesMonitoringConfigurationResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_agent_proto_agent_proto{}); err != nil {
return err
}
return x.CloseSend()
}
type DRPCAgent_PushResourcesMonitoringUsageStream interface {
drpc.Stream
SendAndClose(*PushResourcesMonitoringUsageResponse) error
}
type drpcAgent_PushResourcesMonitoringUsageStream struct {
drpc.Stream
}
func (x *drpcAgent_PushResourcesMonitoringUsageStream) SendAndClose(m *PushResourcesMonitoringUsageResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_agent_proto_agent_proto{}); err != nil {
return err
}
return x.CloseSend()
}
+8
View File
@@ -40,3 +40,11 @@ type DRPCAgentClient23 interface {
DRPCAgentClient22 DRPCAgentClient22
ScriptCompleted(ctx context.Context, in *WorkspaceAgentScriptCompletedRequest) (*WorkspaceAgentScriptCompletedResponse, error) ScriptCompleted(ctx context.Context, in *WorkspaceAgentScriptCompletedRequest) (*WorkspaceAgentScriptCompletedResponse, error)
} }
// DRPCAgentClient24 is the Agent API at v2.4. It adds the GetResourcesMonitoringConfiguration and
// PushResourcesMonitoringUsage RPCs. Compatible with Coder v2.19+
type DRPCAgentClient24 interface {
DRPCAgentClient23
GetResourcesMonitoringConfiguration(ctx context.Context, in *GetResourcesMonitoringConfigurationRequest) (*GetResourcesMonitoringConfigurationResponse, error)
PushResourcesMonitoringUsage(ctx context.Context, in *PushResourcesMonitoringUsageRequest) (*PushResourcesMonitoringUsageResponse, error)
}
+49
View File
@@ -0,0 +1,49 @@
package resourcesmonitor
import (
"golang.org/x/xerrors"
"github.com/coder/coder/v2/cli/clistat"
)
type Fetcher interface {
FetchMemory() (total int64, used int64, err error)
FetchVolume(volume string) (total int64, used int64, err error)
}
type fetcher struct {
*clistat.Statter
}
//nolint:revive
func NewFetcher(f *clistat.Statter) *fetcher {
return &fetcher{
f,
}
}
func (f *fetcher) FetchMemory() (total int64, used int64, err error) {
mem, err := f.HostMemory(clistat.PrefixDefault)
if err != nil {
return 0, 0, xerrors.Errorf("failed to fetch memory: %w", err)
}
if mem.Total == nil {
return 0, 0, xerrors.New("memory total is nil - can not fetch memory")
}
return int64(*mem.Total), int64(mem.Used), nil
}
func (f *fetcher) FetchVolume(volume string) (total int64, used int64, err error) {
vol, err := f.Disk(clistat.PrefixDefault, volume)
if err != nil {
return 0, 0, err
}
if vol.Total == nil {
return 0, 0, xerrors.New("volume total is nil - can not fetch volume")
}
return int64(*vol.Total), int64(vol.Used), nil
}
+85
View File
@@ -0,0 +1,85 @@
package resourcesmonitor
import (
"time"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/coder/coder/v2/agent/proto"
)
type Datapoint struct {
CollectedAt time.Time
Memory *MemoryDatapoint
Volumes []*VolumeDatapoint
}
type MemoryDatapoint struct {
Total int64
Used int64
}
type VolumeDatapoint struct {
Path string
Total int64
Used int64
}
// Queue represents a FIFO queue with a fixed size
type Queue struct {
items []Datapoint
size int
}
// newQueue creates a new Queue with the given size
func NewQueue(size int) *Queue {
return &Queue{
items: make([]Datapoint, 0, size),
size: size,
}
}
// Push adds a new item to the queue
func (q *Queue) Push(item Datapoint) {
if len(q.items) >= q.size {
// Remove the first item (FIFO)
q.items = q.items[1:]
}
q.items = append(q.items, item)
}
func (q *Queue) IsFull() bool {
return len(q.items) == q.size
}
func (q *Queue) Items() []Datapoint {
return q.items
}
func (q *Queue) ItemsAsProto() []*proto.PushResourcesMonitoringUsageRequest_Datapoint {
items := make([]*proto.PushResourcesMonitoringUsageRequest_Datapoint, 0, len(q.items))
for _, item := range q.items {
protoItem := &proto.PushResourcesMonitoringUsageRequest_Datapoint{
CollectedAt: timestamppb.New(item.CollectedAt),
}
if item.Memory != nil {
protoItem.Memory = &proto.PushResourcesMonitoringUsageRequest_Datapoint_MemoryUsage{
Total: item.Memory.Total,
Used: item.Memory.Used,
}
}
for _, volume := range item.Volumes {
protoItem.Volumes = append(protoItem.Volumes, &proto.PushResourcesMonitoringUsageRequest_Datapoint_VolumeUsage{
Volume: volume.Path,
Total: volume.Total,
Used: volume.Used,
})
}
items = append(items, protoItem)
}
return items
}
@@ -0,0 +1,92 @@
package resourcesmonitor_test
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
)
func TestResourceMonitorQueue(t *testing.T) {
t.Parallel()
tests := []struct {
name string
pushCount int
expected []resourcesmonitor.Datapoint
}{
{
name: "Push zero",
pushCount: 0,
expected: []resourcesmonitor.Datapoint{},
},
{
name: "Push less than capacity",
pushCount: 3,
expected: []resourcesmonitor.Datapoint{
{Memory: &resourcesmonitor.MemoryDatapoint{Total: 1, Used: 1}},
{Memory: &resourcesmonitor.MemoryDatapoint{Total: 2, Used: 2}},
{Memory: &resourcesmonitor.MemoryDatapoint{Total: 3, Used: 3}},
},
},
{
name: "Push exactly capacity",
pushCount: 20,
expected: func() []resourcesmonitor.Datapoint {
var result []resourcesmonitor.Datapoint
for i := 1; i <= 20; i++ {
result = append(result, resourcesmonitor.Datapoint{
Memory: &resourcesmonitor.MemoryDatapoint{
Total: int64(i),
Used: int64(i),
},
})
}
return result
}(),
},
{
name: "Push more than capacity",
pushCount: 25,
expected: func() []resourcesmonitor.Datapoint {
var result []resourcesmonitor.Datapoint
for i := 6; i <= 25; i++ {
result = append(result, resourcesmonitor.Datapoint{
Memory: &resourcesmonitor.MemoryDatapoint{
Total: int64(i),
Used: int64(i),
},
})
}
return result
}(),
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
queue := resourcesmonitor.NewQueue(20)
for i := 1; i <= tt.pushCount; i++ {
queue.Push(resourcesmonitor.Datapoint{
Memory: &resourcesmonitor.MemoryDatapoint{
Total: int64(i),
Used: int64(i),
},
})
}
if tt.pushCount < 20 {
require.False(t, queue.IsFull())
} else {
require.True(t, queue.IsFull())
require.Equal(t, 20, len(queue.Items()))
}
require.EqualValues(t, tt.expected, queue.Items())
})
}
}
@@ -0,0 +1,93 @@
package resourcesmonitor
import (
"context"
"time"
"cdr.dev/slog"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/quartz"
)
type monitor struct {
logger slog.Logger
clock quartz.Clock
config *proto.GetResourcesMonitoringConfigurationResponse
resourcesFetcher Fetcher
datapointsPusher datapointsPusher
queue *Queue
}
//nolint:revive
func NewResourcesMonitor(logger slog.Logger, clock quartz.Clock, config *proto.GetResourcesMonitoringConfigurationResponse, resourcesFetcher Fetcher, datapointsPusher datapointsPusher) *monitor {
return &monitor{
logger: logger,
clock: clock,
config: config,
resourcesFetcher: resourcesFetcher,
datapointsPusher: datapointsPusher,
queue: NewQueue(int(config.Config.NumDatapoints)),
}
}
type datapointsPusher interface {
PushResourcesMonitoringUsage(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error)
}
func (m *monitor) Start(ctx context.Context) error {
m.clock.TickerFunc(ctx, time.Duration(m.config.Config.CollectionIntervalSeconds)*time.Second, func() error {
datapoint := Datapoint{
CollectedAt: m.clock.Now(),
Volumes: make([]*VolumeDatapoint, 0, len(m.config.Volumes)),
}
if m.config.Memory != nil && m.config.Memory.Enabled {
memTotal, memUsed, err := m.resourcesFetcher.FetchMemory()
if err != nil {
m.logger.Error(ctx, "failed to fetch memory", slog.Error(err))
} else {
datapoint.Memory = &MemoryDatapoint{
Total: memTotal,
Used: memUsed,
}
}
}
for _, volume := range m.config.Volumes {
if !volume.Enabled {
continue
}
volTotal, volUsed, err := m.resourcesFetcher.FetchVolume(volume.Path)
if err != nil {
m.logger.Error(ctx, "failed to fetch volume", slog.Error(err))
continue
}
datapoint.Volumes = append(datapoint.Volumes, &VolumeDatapoint{
Path: volume.Path,
Total: volTotal,
Used: volUsed,
})
}
m.queue.Push(datapoint)
if m.queue.IsFull() {
_, err := m.datapointsPusher.PushResourcesMonitoringUsage(ctx, &proto.PushResourcesMonitoringUsageRequest{
Datapoints: m.queue.ItemsAsProto(),
})
if err != nil {
// We don't want to stop the monitoring if we fail to push the datapoints
// to the server. We just log the error and continue.
// The queue will anyway remove the oldest datapoint and add the new one.
m.logger.Error(ctx, "failed to push resources monitoring usage", slog.Error(err))
return nil
}
}
return nil
}, "resources_monitor")
return nil
}
@@ -0,0 +1,235 @@
package resourcesmonitor_test
import (
"context"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
"github.com/coder/quartz"
)
type datapointsPusherMock struct {
PushResourcesMonitoringUsageFunc func(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error)
}
func (d *datapointsPusherMock) PushResourcesMonitoringUsage(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
return d.PushResourcesMonitoringUsageFunc(ctx, req)
}
type fetcher struct {
totalMemory int64
usedMemory int64
totalVolume int64
usedVolume int64
errMemory error
errVolume error
}
func (r *fetcher) FetchMemory() (total int64, used int64, err error) {
return r.totalMemory, r.usedMemory, r.errMemory
}
func (r *fetcher) FetchVolume(_ string) (total int64, used int64, err error) {
return r.totalVolume, r.usedVolume, r.errVolume
}
func TestPushResourcesMonitoringWithConfig(t *testing.T) {
t.Parallel()
tests := []struct {
name string
config *proto.GetResourcesMonitoringConfigurationResponse
datapointsPusher func(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error)
fetcher resourcesmonitor.Fetcher
numTicks int
}{
{
name: "SuccessfulMonitoring",
config: &proto.GetResourcesMonitoringConfigurationResponse{
Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{
NumDatapoints: 20,
CollectionIntervalSeconds: 1,
},
Volumes: []*proto.GetResourcesMonitoringConfigurationResponse_Volume{
{
Enabled: true,
Path: "/",
},
},
},
datapointsPusher: func(_ context.Context, _ *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
return &proto.PushResourcesMonitoringUsageResponse{}, nil
},
fetcher: &fetcher{
totalMemory: 16000,
usedMemory: 8000,
totalVolume: 100000,
usedVolume: 50000,
},
numTicks: 20,
},
{
name: "SuccessfulMonitoringLongRun",
config: &proto.GetResourcesMonitoringConfigurationResponse{
Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{
NumDatapoints: 20,
CollectionIntervalSeconds: 1,
},
Volumes: []*proto.GetResourcesMonitoringConfigurationResponse_Volume{
{
Enabled: true,
Path: "/",
},
},
},
datapointsPusher: func(_ context.Context, _ *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
return &proto.PushResourcesMonitoringUsageResponse{}, nil
},
fetcher: &fetcher{
totalMemory: 16000,
usedMemory: 8000,
totalVolume: 100000,
usedVolume: 50000,
},
numTicks: 60,
},
{
// We want to make sure that even if the datapointsPusher fails, the monitoring continues.
name: "ErrorPushingDatapoints",
config: &proto.GetResourcesMonitoringConfigurationResponse{
Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{
NumDatapoints: 20,
CollectionIntervalSeconds: 1,
},
Volumes: []*proto.GetResourcesMonitoringConfigurationResponse_Volume{
{
Enabled: true,
Path: "/",
},
},
},
datapointsPusher: func(_ context.Context, _ *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
return nil, assert.AnError
},
fetcher: &fetcher{
totalMemory: 16000,
usedMemory: 8000,
totalVolume: 100000,
usedVolume: 50000,
},
numTicks: 60,
},
{
// If one of the resources fails to be fetched, the datapoints still should be pushed with the other resources.
name: "ErrorFetchingMemory",
config: &proto.GetResourcesMonitoringConfigurationResponse{
Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{
NumDatapoints: 20,
CollectionIntervalSeconds: 1,
},
Volumes: []*proto.GetResourcesMonitoringConfigurationResponse_Volume{
{
Enabled: true,
Path: "/",
},
},
},
datapointsPusher: func(_ context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
require.Len(t, req.Datapoints, 20)
require.Nil(t, req.Datapoints[0].Memory)
require.NotNil(t, req.Datapoints[0].Volumes)
require.Equal(t, &proto.PushResourcesMonitoringUsageRequest_Datapoint_VolumeUsage{
Volume: "/",
Total: 100000,
Used: 50000,
}, req.Datapoints[0].Volumes[0])
return &proto.PushResourcesMonitoringUsageResponse{}, nil
},
fetcher: &fetcher{
totalMemory: 0,
usedMemory: 0,
errMemory: assert.AnError,
totalVolume: 100000,
usedVolume: 50000,
},
numTicks: 20,
},
{
// If one of the resources fails to be fetched, the datapoints still should be pushed with the other resources.
name: "ErrorFetchingVolume",
config: &proto.GetResourcesMonitoringConfigurationResponse{
Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{
NumDatapoints: 20,
CollectionIntervalSeconds: 1,
},
Volumes: []*proto.GetResourcesMonitoringConfigurationResponse_Volume{
{
Enabled: true,
Path: "/",
},
},
},
datapointsPusher: func(_ context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
require.Len(t, req.Datapoints, 20)
require.Len(t, req.Datapoints[0].Volumes, 0)
return &proto.PushResourcesMonitoringUsageResponse{}, nil
},
fetcher: &fetcher{
totalMemory: 16000,
usedMemory: 8000,
totalVolume: 0,
usedVolume: 0,
errVolume: assert.AnError,
},
numTicks: 20,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var (
logger = slog.Make(sloghuman.Sink(os.Stdout))
clk = quartz.NewMock(t)
counterCalls = 0
)
datapointsPusher := func(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
counterCalls++
return tt.datapointsPusher(ctx, req)
}
pusher := &datapointsPusherMock{
PushResourcesMonitoringUsageFunc: datapointsPusher,
}
monitor := resourcesmonitor.NewResourcesMonitor(logger, clk, tt.config, tt.fetcher, pusher)
require.NoError(t, monitor.Start(ctx))
for i := 0; i < tt.numTicks; i++ {
_, waiter := clk.AdvanceNext()
require.NoError(t, waiter.Wait(ctx))
}
// expectedCalls is computed with the following logic :
// We have one call per tick, once reached the ${config.NumDatapoints}.
expectedCalls := tt.numTicks - int(tt.config.Config.NumDatapoints) + 1
require.Equal(t, expectedCalls, counterCalls)
cancel()
})
}
}
+7
View File
@@ -42,6 +42,7 @@ type API struct {
*LifecycleAPI *LifecycleAPI
*AppsAPI *AppsAPI
*MetadataAPI *MetadataAPI
*ResourcesMonitoringAPI
*LogsAPI *LogsAPI
*ScriptsAPI *ScriptsAPI
*tailnet.DRPCService *tailnet.DRPCService
@@ -102,6 +103,12 @@ func New(opts Options) *API {
appearanceFetcher: opts.AppearanceFetcher, appearanceFetcher: opts.AppearanceFetcher,
} }
api.ResourcesMonitoringAPI = &ResourcesMonitoringAPI{
Log: opts.Log,
AgentID: opts.AgentID,
Database: opts.Database,
}
api.StatsAPI = &StatsAPI{ api.StatsAPI = &StatsAPI{
AgentFn: api.agent, AgentFn: api.agent,
Database: opts.Database, Database: opts.Database,
+67
View File
@@ -0,0 +1,67 @@
package agentapi
import (
"context"
"database/sql"
"errors"
"golang.org/x/xerrors"
"github.com/google/uuid"
"cdr.dev/slog"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
)
type ResourcesMonitoringAPI struct {
AgentID uuid.UUID
Database database.Store
Log slog.Logger
}
func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context.Context, _ *proto.GetResourcesMonitoringConfigurationRequest) (*proto.GetResourcesMonitoringConfigurationResponse, error) {
memoryMonitor, memoryErr := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID)
if memoryErr != nil && !errors.Is(memoryErr, sql.ErrNoRows) {
return nil, xerrors.Errorf("failed to fetch memory resource monitor: %w", memoryErr)
}
volumeMonitors, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil {
return nil, xerrors.Errorf("failed to fetch volume resource monitors: %w", err)
}
return &proto.GetResourcesMonitoringConfigurationResponse{
Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{
CollectionIntervalSeconds: 10,
NumDatapoints: 20,
},
Memory: func() *proto.GetResourcesMonitoringConfigurationResponse_Memory {
if memoryErr != nil {
return nil
}
return &proto.GetResourcesMonitoringConfigurationResponse_Memory{
Enabled: memoryMonitor.Enabled,
}
}(),
Volumes: func() []*proto.GetResourcesMonitoringConfigurationResponse_Volume {
volumes := make([]*proto.GetResourcesMonitoringConfigurationResponse_Volume, 0, len(volumeMonitors))
for _, monitor := range volumeMonitors {
volumes = append(volumes, &proto.GetResourcesMonitoringConfigurationResponse_Volume{
Enabled: monitor.Enabled,
Path: monitor.Path,
})
}
return volumes
}(),
}, nil
}
func (a *ResourcesMonitoringAPI) PushResourcesMonitoringUsage(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
a.Log.Info(ctx, "resources monitoring usage received",
slog.F("request", req))
return &proto.PushResourcesMonitoringUsageResponse{}, nil
}
+16 -2
View File
@@ -184,6 +184,8 @@ var (
rbac.ResourceGroup.Type: {policy.ActionRead}, rbac.ResourceGroup.Type: {policy.ActionRead},
// Provisionerd creates notification messages // Provisionerd creates notification messages
rbac.ResourceNotificationMessage.Type: {policy.ActionCreate, policy.ActionRead}, rbac.ResourceNotificationMessage.Type: {policy.ActionCreate, policy.ActionRead},
// Provisionerd creates workspaces resources monitor
rbac.ResourceWorkspaceAgentResourceMonitor.Type: {policy.ActionCreate},
}), }),
Org: map[string][]rbac.Permission{}, Org: map[string][]rbac.Permission{},
User: []rbac.Permission{}, User: []rbac.Permission{},
@@ -1392,7 +1394,13 @@ func (q *querier) FavoriteWorkspace(ctx context.Context, id uuid.UUID) error {
} }
func (q *querier) FetchMemoryResourceMonitorsByAgentID(ctx context.Context, agentID uuid.UUID) (database.WorkspaceAgentMemoryResourceMonitor, error) { func (q *querier) FetchMemoryResourceMonitorsByAgentID(ctx context.Context, agentID uuid.UUID) (database.WorkspaceAgentMemoryResourceMonitor, error) {
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceWorkspaceAgentResourceMonitor); err != nil { workspace, err := q.db.GetWorkspaceByAgentID(ctx, agentID)
if err != nil {
return database.WorkspaceAgentMemoryResourceMonitor{}, err
}
err = q.authorizeContext(ctx, policy.ActionRead, workspace)
if err != nil {
return database.WorkspaceAgentMemoryResourceMonitor{}, err return database.WorkspaceAgentMemoryResourceMonitor{}, err
} }
@@ -1407,7 +1415,13 @@ func (q *querier) FetchNewMessageMetadata(ctx context.Context, arg database.Fetc
} }
func (q *querier) FetchVolumesResourceMonitorsByAgentID(ctx context.Context, agentID uuid.UUID) ([]database.WorkspaceAgentVolumeResourceMonitor, error) { func (q *querier) FetchVolumesResourceMonitorsByAgentID(ctx context.Context, agentID uuid.UUID) ([]database.WorkspaceAgentVolumeResourceMonitor, error) {
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceWorkspaceAgentResourceMonitor); err != nil { workspace, err := q.db.GetWorkspaceByAgentID(ctx, agentID)
if err != nil {
return nil, err
}
err = q.authorizeContext(ctx, policy.ActionRead, workspace)
if err != nil {
return nil, err return nil, err
} }
+2 -2
View File
@@ -4772,7 +4772,7 @@ func (s *MethodTestSuite) TestResourcesMonitor() {
monitor, err := db.FetchMemoryResourceMonitorsByAgentID(context.Background(), agt.ID) monitor, err := db.FetchMemoryResourceMonitorsByAgentID(context.Background(), agt.ID)
require.NoError(s.T(), err) require.NoError(s.T(), err)
check.Args(agt.ID).Asserts(rbac.ResourceWorkspaceAgentResourceMonitor, policy.ActionRead).Returns(monitor) check.Args(agt.ID).Asserts(w, policy.ActionRead).Returns(monitor)
})) }))
s.Run("FetchVolumesResourceMonitorsByAgentID", s.Subtest(func(db database.Store, check *expects) { s.Run("FetchVolumesResourceMonitorsByAgentID", s.Subtest(func(db database.Store, check *expects) {
@@ -4813,6 +4813,6 @@ func (s *MethodTestSuite) TestResourcesMonitor() {
monitors, err := db.FetchVolumesResourceMonitorsByAgentID(context.Background(), agt.ID) monitors, err := db.FetchVolumesResourceMonitorsByAgentID(context.Background(), agt.ID)
require.NoError(s.T(), err) require.NoError(s.T(), err)
check.Args(agt.ID).Asserts(rbac.ResourceWorkspaceAgentResourceMonitor, policy.ActionRead).Returns(monitors) check.Args(agt.ID).Asserts(w, policy.ActionRead).Returns(monitors)
})) }))
} }
+1 -1
View File
@@ -2234,7 +2234,7 @@ func requireGetManifest(ctx context.Context, t testing.TB, aAPI agentproto.DRPCA
} }
func postStartup(ctx context.Context, t testing.TB, client agent.Client, startup *agentproto.Startup) error { func postStartup(ctx context.Context, t testing.TB, client agent.Client, startup *agentproto.Startup) error {
aAPI, _, err := client.ConnectRPC23(ctx) aAPI, _, err := client.ConnectRPC24(ctx)
require.NoError(t, err) require.NoError(t, err)
defer func() { defer func() {
cErr := aAPI.DRPCConn().Close() cErr := aAPI.DRPCConn().Close()
+12
View File
@@ -229,6 +229,18 @@ func (c *Client) ConnectRPC23(ctx context.Context) (
return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil
} }
// ConnectRPC24 returns a dRPC client to the Agent API v2.4. It is useful when you want to be
// maximally compatible with Coderd Release Versions from 2.xx+ // TODO @vincent: define version
func (c *Client) ConnectRPC24(ctx context.Context) (
proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient24, error,
) {
conn, err := c.connectRPCVersion(ctx, apiversion.New(2, 4))
if err != nil {
return nil, nil, err
}
return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil
}
// ConnectRPC connects to the workspace agent API and tailnet API // ConnectRPC connects to the workspace agent API and tailnet API
func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) { func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) {
return c.connectRPCVersion(ctx, proto.CurrentVersion) return c.connectRPCVersion(ctx, proto.CurrentVersion)
+6
View File
@@ -34,3 +34,9 @@ type DRPCTailnetClient23 interface {
RefreshResumeToken(ctx context.Context, in *RefreshResumeTokenRequest) (*RefreshResumeTokenResponse, error) RefreshResumeToken(ctx context.Context, in *RefreshResumeTokenRequest) (*RefreshResumeTokenResponse, error)
WorkspaceUpdates(ctx context.Context, in *WorkspaceUpdatesRequest) (DRPCTailnet_WorkspaceUpdatesClient, error) WorkspaceUpdates(ctx context.Context, in *WorkspaceUpdatesRequest) (DRPCTailnet_WorkspaceUpdatesClient, error)
} }
// DRPCTailnetClient24 is the Tailnet API at v2.4. It is functionally identical to 2.3, because the
// change was to the Agent API (ResourcesMonitoring methods).
type DRPCTailnetClient24 interface {
DRPCTailnetClient23
}
+6 -1
View File
@@ -38,9 +38,14 @@ import (
// shipped in Coder v2.16.0, but we forgot to increment the API version. If // shipped in Coder v2.16.0, but we forgot to increment the API version. If
// you dial for API v2.2, you MAY be connected to a server that supports // you dial for API v2.2, you MAY be connected to a server that supports
// ScriptCompleted, but be prepared to process "unsupported" errors.) // ScriptCompleted, but be prepared to process "unsupported" errors.)
//
// API v2.4:
// - Shipped in Coder v2.{{placeholder}} // TODO Vincent: Replace with the correct version
// - Added support for GetResourcesMonitoringConfiguration and
// PushResourcesMonitoringUsage RPCs on the Agent API.
const ( const (
CurrentMajor = 2 CurrentMajor = 2
CurrentMinor = 3 CurrentMinor = 4
) )
var CurrentVersion = apiversion.New(CurrentMajor, CurrentMinor) var CurrentVersion = apiversion.New(CurrentMajor, CurrentMinor)