diff --git a/coderd/database/db2sdk/db2sdk.go b/coderd/database/db2sdk/db2sdk.go index 5760cd50a8..7d17f0c854 100644 --- a/coderd/database/db2sdk/db2sdk.go +++ b/coderd/database/db2sdk/db2sdk.go @@ -20,6 +20,7 @@ import ( agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/externalauth/gitprovider" "github.com/coder/coder/v2/coderd/rbac" "github.com/coder/coder/v2/coderd/rbac/policy" @@ -522,7 +523,7 @@ func WorkspaceAgent(derpMap *tailcfg.DERPMap, coordinator tailnet.Coordinator, } } - status := dbAgent.Status(agentInactiveDisconnectTimeout) + status := dbAgent.Status(dbtime.Now(), agentInactiveDisconnectTimeout) workspaceAgent.Status = codersdk.WorkspaceAgentStatus(status.Status) workspaceAgent.FirstConnectedAt = status.FirstConnectedAt workspaceAgent.LastConnectedAt = status.LastConnectedAt diff --git a/coderd/database/modelmethods.go b/coderd/database/modelmethods.go index b03e3729fd..cf2b0e542a 100644 --- a/coderd/database/modelmethods.go +++ b/coderd/database/modelmethods.go @@ -15,7 +15,6 @@ import ( "golang.org/x/oauth2" "golang.org/x/xerrors" - "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/rbac" "github.com/coder/coder/v2/coderd/rbac/policy" ) @@ -624,7 +623,7 @@ type WorkspaceAgentConnectionStatus struct { DisconnectedAt *time.Time `json:"disconnected_at"` } -func (a WorkspaceAgent) Status(inactiveTimeout time.Duration) WorkspaceAgentConnectionStatus { +func (a WorkspaceAgent) Status(now time.Time, inactiveTimeout time.Duration) WorkspaceAgentConnectionStatus { connectionTimeout := time.Duration(a.ConnectionTimeoutSeconds) * time.Second status := WorkspaceAgentConnectionStatus{ @@ -643,7 +642,7 @@ func (a WorkspaceAgent) Status(inactiveTimeout time.Duration) WorkspaceAgentConn switch { case !a.FirstConnectedAt.Valid: switch { - case connectionTimeout > 0 && dbtime.Now().Sub(a.CreatedAt) > connectionTimeout: + case connectionTimeout > 0 && now.Sub(a.CreatedAt) > connectionTimeout: // If the agent took too long to connect the first time, // mark it as timed out. status.Status = WorkspaceAgentStatusTimeout @@ -658,7 +657,7 @@ func (a WorkspaceAgent) Status(inactiveTimeout time.Duration) WorkspaceAgentConn // If we've disconnected after our last connection, we know the // agent is no longer connected. status.Status = WorkspaceAgentStatusDisconnected - case dbtime.Now().Sub(a.LastConnectedAt.Time) > inactiveTimeout: + case now.Sub(a.LastConnectedAt.Time) > inactiveTimeout: // The connection died without updating the last connected. status.Status = WorkspaceAgentStatusDisconnected // Client code needs an accurate disconnected at if the agent has been inactive. diff --git a/coderd/prometheusmetrics/prometheusmetrics.go b/coderd/prometheusmetrics/prometheusmetrics.go index 76d7c5faaa..70b351cd3a 100644 --- a/coderd/prometheusmetrics/prometheusmetrics.go +++ b/coderd/prometheusmetrics/prometheusmetrics.go @@ -335,21 +335,43 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis go func() { defer close(done) defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - } + collect := func() { logger.Debug(ctx, "agent metrics collection is starting") timer := prometheus.NewTimer(metricsCollectorAgents) + defer func() { + logger.Debug(ctx, "agent metrics collection is done") + timer.ObserveDuration() + ticker.Reset(duration) + }() + derpMap := derpMapFn() + // Use a consistent value for now for the duration of this collection + // to avoid drift during the loop over workspaceAgents, which can cause + // incorrect reporting of agent connection status. + now := dbtime.Now() + workspaceAgents, err := db.GetWorkspaceAgentsForMetrics(ctx) if err != nil { logger.Error(ctx, "can't get workspace agents", slog.Error(err)) - goto done + return + } + + // Prepopulate our known agents and apps before processing, this saves us from having to make a database + // roundtrip for every iteration of the loop to get the list of apps for the current agent. + agentIDs := make([]uuid.UUID, 0, len(workspaceAgents)) + for _, agent := range workspaceAgents { + agentIDs = append(agentIDs, agent.WorkspaceAgent.ID) + } + allApps, err := db.GetWorkspaceAppsByAgentIDs(ctx, agentIDs) + if err != nil { + logger.Error(ctx, "can't get workspace apps", slog.Error(err)) + return + } + appsByAgentID := make(map[uuid.UUID][]database.WorkspaceApp, len(workspaceAgents)) + for _, app := range allApps { + appsByAgentID[app.AgentID] = append(appsByAgentID[app.AgentID], app) } for _, agent := range workspaceAgents { @@ -382,7 +404,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis observedFirstConnection[agent.WorkspaceAgent.ID] = struct{}{} } } - connectionStatus := agent.WorkspaceAgent.Status(agentInactiveDisconnectTimeout) + connectionStatus := agent.WorkspaceAgent.Status(now, agentInactiveDisconnectTimeout) node := (*coordinator.Load()).Node(agent.WorkspaceAgent.ID) tailnetNode := "unknown" @@ -420,13 +442,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis } // Collect information about registered applications - apps, err := db.GetWorkspaceAppsByAgentID(ctx, agent.WorkspaceAgent.ID) - if err != nil && !errors.Is(err, sql.ErrNoRows) { - logger.Error(ctx, "can't get workspace apps", slog.F("agent_id", agent.WorkspaceAgent.ID), slog.Error(err)) - continue - } - - for _, app := range apps { + for _, app := range appsByAgentID[agent.WorkspaceAgent.ID] { agentsAppsGauge.WithLabelValues(VectorOperationAdd, 1, agent.WorkspaceAgent.Name, agent.OwnerUsername, agent.WorkspaceName, app.DisplayName, string(app.Health)) } } @@ -449,11 +465,15 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis agentsConnectionsGauge.Commit() agentsConnectionLatenciesGauge.Commit() agentsAppsGauge.Commit() + } - done: - logger.Debug(ctx, "agent metrics collection is done") - timer.ObserveDuration() - ticker.Reset(duration) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + collect() } }() return func() { diff --git a/coderd/workspaceapps/db.go b/coderd/workspaceapps/db.go index 02a4859cca..da4b8d6ef5 100644 --- a/coderd/workspaceapps/db.go +++ b/coderd/workspaceapps/db.go @@ -231,7 +231,7 @@ func (p *DBTokenProvider) Issue(ctx context.Context, rw http.ResponseWriter, r * } // Check that the agent is online. - agentStatus := dbReq.Agent.Status(p.WorkspaceAgentInactiveTimeout) + agentStatus := dbReq.Agent.Status(dbtime.Now(), p.WorkspaceAgentInactiveTimeout) if agentStatus.Status != database.WorkspaceAgentStatusConnected { WriteWorkspaceAppOffline(p.Logger, p.DashboardURL, rw, r, &appReq, fmt.Sprintf("Agent state is %q, not %q", agentStatus.Status, database.WorkspaceAgentStatusConnected)) return nil, "", false diff --git a/coderd/x/chatd/chattool/createworkspace.go b/coderd/x/chatd/chattool/createworkspace.go index c3c7ca75e3..c4bfe5c45c 100644 --- a/coderd/x/chatd/chattool/createworkspace.go +++ b/coderd/x/chatd/chattool/createworkspace.go @@ -14,6 +14,7 @@ import ( "cdr.dev/slog/v3" "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/util/namesgenerator" "github.com/coder/coder/v2/coderd/x/chatd/internal/agentselect" "github.com/coder/coder/v2/codersdk" @@ -442,7 +443,7 @@ func (o CreateWorkspaceOptions) checkExistingWorkspace( ) selected = agents[0] } - status := selected.Status(agentInactiveDisconnectTimeout) + status := selected.Status(dbtime.Now(), agentInactiveDisconnectTimeout) result := map[string]any{ "created": false, "workspace_name": ws.Name, diff --git a/scaletest/workspaceupdates/run.go b/scaletest/workspaceupdates/run.go index a310bd646a..4f2464d5e6 100644 --- a/scaletest/workspaceupdates/run.go +++ b/scaletest/workspaceupdates/run.go @@ -75,10 +75,18 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error { return xerrors.Errorf("create user: %w", err) } newUser := newUserAndToken.User - newUserClient := codersdk.New(r.client.URL, - codersdk.WithSessionToken(newUserAndToken.SessionToken), - codersdk.WithLogger(logger), - codersdk.WithLogBodies()) + // Create a user client with an independent HTTP transport cloned from the + // runner's client. Using codersdk.New directly would inherit + // http.DefaultTransport, which is shared across all runners. That causes + // all user WebSocket connections to reuse the same TCP connection pool and + // land on the same coderd replica, concentrating load. + newUserClient, err := loadtestutil.DupClientCopyingHeaders(r.client, nil) + if err != nil { + return xerrors.Errorf("create user client: %w", err) + } + newUserClient.SetSessionToken(newUserAndToken.SessionToken) + newUserClient.SetLogger(logger) + newUserClient.SetLogBodies(true) logger.Info(ctx, fmt.Sprintf("user %q created", newUser.Username), slog.F("id", newUser.ID.String()))