package coderd import ( "bytes" "context" "database/sql" "encoding/json" "errors" "fmt" "io" "net/http" "net/url" "slices" "sort" "strconv" "strings" "time" "github.com/go-chi/chi/v5" "github.com/google/uuid" "github.com/sqlc-dev/pqtype" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "tailscale.com/tailcfg" "cdr.dev/slog/v3" "github.com/coder/coder/v2/coderd/agentapi" "github.com/coder/coder/v2/coderd/agentapi/metadatabatcher" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/db2sdk" "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/externalauth" "github.com/coder/coder/v2/coderd/httpapi" "github.com/coder/coder/v2/coderd/httpmw" "github.com/coder/coder/v2/coderd/httpmw/loggermw" "github.com/coder/coder/v2/coderd/jwtutils" "github.com/coder/coder/v2/coderd/prebuilds" "github.com/coder/coder/v2/coderd/rbac" "github.com/coder/coder/v2/coderd/rbac/policy" "github.com/coder/coder/v2/coderd/telemetry" maputil "github.com/coder/coder/v2/coderd/util/maps" "github.com/coder/coder/v2/coderd/wspubsub" "github.com/coder/coder/v2/coderd/x/chatd" "github.com/coder/coder/v2/coderd/x/chatd/chatprompt" "github.com/coder/coder/v2/coderd/x/gitsync" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/codersdk/wsjson" "github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet/proto" "github.com/coder/websocket" ) // @Summary Get workspace agent by ID // @ID get-workspace-agent-by-id // @Security CoderSessionToken // @Produce json // @Tags Agents // @Param workspaceagent path string true "Workspace agent ID" format(uuid) // @Success 200 {object} codersdk.WorkspaceAgent // @Router /api/v2/workspaceagents/{workspaceagent} [get] func (api *API) workspaceAgent(rw http.ResponseWriter, r *http.Request) { var ( ctx = r.Context() waws = httpmw.WorkspaceAgentAndWorkspaceParam(r) dbApps []database.WorkspaceApp scripts []database.GetWorkspaceAgentScriptsByAgentIDsRow logSources []database.WorkspaceAgentLogSource ) var eg errgroup.Group eg.Go(func() (err error) { dbApps, err = api.Database.GetWorkspaceAppsByAgentID(ctx, waws.WorkspaceAgent.ID) return err }) eg.Go(func() (err error) { //nolint:gocritic // TODO: can we make this not require system restricted? scripts, err = api.Database.GetWorkspaceAgentScriptsByAgentIDs(dbauthz.AsSystemRestricted(ctx), []uuid.UUID{waws.WorkspaceAgent.ID}) return err }) eg.Go(func() (err error) { //nolint:gocritic // TODO: can we make this not require system restricted? logSources, err = api.Database.GetWorkspaceAgentLogSourcesByAgentIDs(dbauthz.AsSystemRestricted(ctx), []uuid.UUID{waws.WorkspaceAgent.ID}) return err }) err := eg.Wait() if httpapi.Is404Error(err) { httpapi.ResourceNotFound(rw) return } if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error fetching workspace agent.", Detail: err.Error(), }) return } appIDs := []uuid.UUID{} for _, app := range dbApps { appIDs = append(appIDs, app.ID) } // nolint:gocritic // This is a system restricted operation. statuses, err := api.Database.GetWorkspaceAppStatusesByAppIDs(dbauthz.AsSystemRestricted(ctx), appIDs) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error fetching workspace app statuses.", Detail: err.Error(), }) return } apiAgent, err := db2sdk.WorkspaceAgent( api.DERPMap(), *api.TailnetCoordinator.Load(), waws.WorkspaceAgent, db2sdk.Apps(dbApps, statuses, waws.WorkspaceAgent, waws.OwnerUsername, waws.WorkspaceTable), convertScripts(scripts), convertLogSources(logSources), api.AgentInactiveDisconnectTimeout, api.DeploymentValues.AgentFallbackTroubleshootingURL.String(), ) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error reading workspace agent.", Detail: err.Error(), }) return } httpapi.Write(ctx, rw, http.StatusOK, apiAgent) } const AgentAPIVersionREST = "1.0" // @Summary Patch workspace agent logs // @ID patch-workspace-agent-logs // @Security CoderSessionToken // @Accept json // @Produce json // @Tags Agents // @Param request body agentsdk.PatchLogs true "logs" // @Success 200 {object} codersdk.Response // @Router /api/v2/workspaceagents/me/logs [patch] func (api *API) patchWorkspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() workspaceAgent := httpmw.WorkspaceAgent(r) var req agentsdk.PatchLogs if !httpapi.Read(ctx, rw, r, &req) { return } if len(req.Logs) == 0 { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "No logs provided.", }) return } // This is to support the legacy API where the log source ID was // not provided in the request body. We default to the external // log source in this case. if req.LogSourceID == uuid.Nil { // Use the external log source externalSources, err := api.Database.InsertWorkspaceAgentLogSources(ctx, database.InsertWorkspaceAgentLogSourcesParams{ WorkspaceAgentID: workspaceAgent.ID, CreatedAt: dbtime.Now(), ID: []uuid.UUID{agentsdk.ExternalLogSourceID}, DisplayName: []string{"External"}, Icon: []string{"/emojis/1f310.png"}, }) if database.IsUniqueViolation(err, database.UniqueWorkspaceAgentLogSourcesPkey) { err = nil req.LogSourceID = agentsdk.ExternalLogSourceID } if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to create external log source.", Detail: err.Error(), }) return } if len(externalSources) == 1 { req.LogSourceID = externalSources[0].ID } } output := make([]string, 0) level := make([]database.LogLevel, 0) outputLength := 0 for _, logEntry := range req.Logs { sanitizedOutput := agentsdk.SanitizeLogOutput(logEntry.Output) output = append(output, sanitizedOutput) outputLength += len(sanitizedOutput) if logEntry.Level == "" { // Default to "info" to support older agents that didn't have the level field. logEntry.Level = codersdk.LogLevelInfo } parsedLevel := database.LogLevel(logEntry.Level) if !parsedLevel.Valid() { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Invalid log level provided.", Detail: fmt.Sprintf("invalid log level: %q", logEntry.Level), }) return } level = append(level, parsedLevel) } logs, err := api.Database.InsertWorkspaceAgentLogs(ctx, database.InsertWorkspaceAgentLogsParams{ AgentID: workspaceAgent.ID, CreatedAt: dbtime.Now(), Output: output, Level: level, LogSourceID: req.LogSourceID, // #nosec G115 - Log output length is limited and fits in int32 OutputLength: int32(outputLength), }) if err != nil { if !database.IsWorkspaceAgentLogsLimitError(err) { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to upload logs", Detail: err.Error(), }) return } if workspaceAgent.LogsOverflowed { httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{ Message: "Logs limit exceeded", Detail: err.Error(), }) return } err := api.Database.UpdateWorkspaceAgentLogOverflowByID(ctx, database.UpdateWorkspaceAgentLogOverflowByIDParams{ ID: workspaceAgent.ID, LogsOverflowed: true, }) if err != nil { // We don't want to return here, because the agent will retry // on failure and this isn't a huge deal. The overflow state // is just a hint to the user that the logs are incomplete. api.Logger.Warn(ctx, "failed to update workspace agent log overflow", slog.Error(err)) } workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID) if err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Failed to get workspace.", Detail: err.Error(), }) return } api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{ Kind: wspubsub.WorkspaceEventKindAgentLogsOverflow, WorkspaceID: workspace.ID, AgentID: &workspaceAgent.ID, }) httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{ Message: "Logs limit exceeded", }) return } lowestLogID := logs[0].ID // Publish by the lowest log ID inserted so the // log stream will fetch everything from that point. api.publishWorkspaceAgentLogsUpdate(ctx, workspaceAgent.ID, agentsdk.LogsNotifyMessage{ CreatedAfter: lowestLogID - 1, }) if workspaceAgent.LogsLength == 0 { // If these are the first logs being appended, we publish a UI update // to notify the UI that logs are now available. workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID) if err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Failed to get workspace.", Detail: err.Error(), }) return } api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{ Kind: wspubsub.WorkspaceEventKindAgentFirstLogs, WorkspaceID: workspace.ID, AgentID: &workspaceAgent.ID, }) } httpapi.Write(ctx, rw, http.StatusOK, nil) } // @Summary Patch workspace agent app status // @ID patch-workspace-agent-app-status // @Security CoderSessionToken // @Accept json // @Produce json // @Tags Agents // @Param request body agentsdk.PatchAppStatus true "app status" // @Success 200 {object} codersdk.Response // @Router /api/v2/workspaceagents/me/app-status [patch] // @Deprecated Use UpdateAppStatus on the Agent API instead. func (api *API) patchWorkspaceAgentAppStatus(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() workspaceAgent := httpmw.WorkspaceAgent(r) var req agentsdk.PatchAppStatus if !httpapi.Read(ctx, rw, r, &req) { return } workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID) if err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Failed to get workspace.", Detail: err.Error(), }) return } // This functionality has been moved to the AppsAPI in the agentapi. We keep this HTTP handler around for back // compatibility with older agents. We'll translate the request into the protobuf so there is only one primary // implementation. cachedWs := &agentapi.CachedWorkspaceFields{} cachedWs.UpdateValues(workspace) appAPI := &agentapi.AppsAPI{ AgentID: workspaceAgent.ID, Database: api.Database, Log: api.Logger, Workspace: cachedWs, AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) { return api.Database.GetWorkspaceAgentByID(ctx, workspaceAgent.ID) }, PublishWorkspaceUpdateFn: func(ctx context.Context, agentID uuid.UUID, kind wspubsub.WorkspaceEventKind) error { api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{ Kind: kind, WorkspaceID: workspace.ID, AgentID: &agentID, }) return nil }, NotificationsEnqueuer: api.NotificationsEnqueuer, Clock: api.Clock, } protoReq, err := agentsdk.ProtoFromPatchAppStatus(req) if err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Failed to parse request.", Detail: err.Error(), }) return } _, err = appAPI.UpdateAppStatus(r.Context(), protoReq) if err != nil { sdkErr := new(codersdk.Error) if xerrors.As(err, &sdkErr) { httpapi.Write(ctx, rw, sdkErr.StatusCode(), sdkErr.Response) return } httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to update app status.", Detail: err.Error(), }) return } httpapi.Write(ctx, rw, http.StatusOK, nil) } // workspaceAgentLogs returns the logs associated with a workspace agent // // @Summary Get logs by workspace agent // @ID get-logs-by-workspace-agent // @Security CoderSessionToken // @Produce json // @Tags Agents // @Param workspaceagent path string true "Workspace agent ID" format(uuid) // @Param before query int false "Before log id" // @Param after query int false "After log id" // @Param follow query bool false "Follow log stream" // @Param no_compression query bool false "Disable compression for WebSocket connection" // @Param format query string false "Log output format. Accepted: 'json' (default), 'text' (plain text with RFC3339 timestamps and ANSI colors). Not supported with follow=true." Enums(json,text) // @Success 200 {array} codersdk.WorkspaceAgentLog // @Router /api/v2/workspaceagents/{workspaceagent}/logs [get] func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { // This mostly copies how provisioner job logs are streamed! var ( ctx = r.Context() waws = httpmw.WorkspaceAgentAndWorkspaceParam(r) logger = api.Logger.With(slog.F("workspace_agent_id", waws.WorkspaceAgent.ID)) follow = r.URL.Query().Has("follow") afterRaw = r.URL.Query().Get("after") noCompression = r.URL.Query().Has("no_compression") format = r.URL.Query().Get("format") ) // Validate format parameter. if format == "" { format = "json" } if format != "json" && format != "text" { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Invalid format parameter.", Detail: "Allowed values are \"json\" and \"text\".", }) return } // Text format is not supported with streaming. if format == "text" && follow { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Text format is not supported with follow mode.", Detail: "Use format=json or omit the follow parameter.", }) return } var after int64 // Only fetch logs created after the time provided. if afterRaw != "" { var err error after, err = strconv.ParseInt(afterRaw, 10, 64) if err != nil || after < 0 { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Query param \"after\" must be an integer greater than or equal to zero.", Validations: []codersdk.ValidationError{ {Field: "after", Detail: "Must be an integer greater than or equal to zero"}, }, }) return } } logs, err := api.Database.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{ AgentID: waws.WorkspaceAgent.ID, CreatedAfter: after, }) if errors.Is(err, sql.ErrNoRows) { err = nil } if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error fetching provisioner logs.", Detail: err.Error(), }) return } if logs == nil { logs = []database.WorkspaceAgentLog{} } if !follow { if format == "text" { sids, err := api.Database.GetWorkspaceAgentLogSourcesByAgentIDs(ctx, []uuid.UUID{waws.WorkspaceAgent.ID}) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error fetching workspace agent log sources.", Detail: err.Error(), }) return } lsids := make(map[uuid.UUID]string, len(sids)) for _, sid := range sids { lsids[sid.ID] = sid.DisplayName } rw.Header().Set("Content-Type", "text/plain; charset=utf-8") rw.WriteHeader(http.StatusOK) for _, log := range logs { _, _ = rw.Write([]byte(db2sdk.WorkspaceAgentLog(log).Text(waws.WorkspaceAgent.Name, lsids[log.LogSourceID]))) _, _ = rw.Write([]byte("\n")) } return } httpapi.Write(ctx, rw, http.StatusOK, convertWorkspaceAgentLogs(logs)) return } api.WebsocketWaitMutex.Lock() api.WebsocketWaitGroup.Add(1) api.WebsocketWaitMutex.Unlock() defer api.WebsocketWaitGroup.Done() opts := &websocket.AcceptOptions{} // Allow client to request no compression. This is useful for buggy // clients or if there's a client/server incompatibility. This is // needed with e.g. coder/websocket and Safari (confirmed in 16.5). // // See: // * https://github.com/nhooyr/websocket/issues/218 // * https://github.com/gobwas/ws/issues/169 if noCompression { opts.CompressionMode = websocket.CompressionDisabled } conn, err := websocket.Accept(rw, r, opts) if err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Failed to accept websocket.", Detail: err.Error(), }) return } ctx, cancel := context.WithCancel(ctx) defer cancel() go httpapi.HeartbeatClose(ctx, api.Logger, cancel, conn) encoder := wsjson.NewEncoder[[]codersdk.WorkspaceAgentLog](conn, websocket.MessageText) defer encoder.Close(websocket.StatusNormalClosure) err = encoder.Encode(convertWorkspaceAgentLogs(logs)) if err != nil { return } lastSentLogID := after if len(logs) > 0 { lastSentLogID = logs[len(logs)-1].ID } workspaceNotifyCh := make(chan struct{}, 1) notifyCh := make(chan struct{}, 1) // Allow us to immediately check if we missed any logs // between initial fetch and subscribe. notifyCh <- struct{}{} // Subscribe to workspace to detect new builds. closeSubscribeWorkspace, err := api.Pubsub.SubscribeWithErr(wspubsub.WorkspaceEventChannel(waws.WorkspaceTable.OwnerID), wspubsub.HandleWorkspaceEvent( func(_ context.Context, e wspubsub.WorkspaceEvent, err error) { if err != nil { return } if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == waws.WorkspaceTable.ID { select { case workspaceNotifyCh <- struct{}{}: default: } } })) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to subscribe to workspace for log streaming.", Detail: err.Error(), }) return } defer closeSubscribeWorkspace() // Subscribe early to prevent missing log events. closeSubscribe, err := api.Pubsub.Subscribe(agentsdk.LogsNotifyChannel(waws.WorkspaceAgent.ID), func(_ context.Context, _ []byte) { // The message is not important, we're tracking lastSentLogID manually. select { case notifyCh <- struct{}{}: default: } }) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to subscribe to agent for log streaming.", Detail: err.Error(), }) return } defer closeSubscribe() // Buffer size controls the log prefetch capacity. bufferedLogs := make(chan []database.WorkspaceAgentLog, 8) // Check at least once per minute in case we didn't receive a pubsub message. recheckInterval := time.Minute t := time.NewTicker(recheckInterval) defer t.Stop() // Log the request immediately instead of after it completes. if rl := loggermw.RequestLoggerFromContext(ctx); rl != nil { rl.WriteLog(ctx, http.StatusAccepted) } go func() { defer func() { logger.Debug(ctx, "end log streaming loop") close(bufferedLogs) }() logger.Debug(ctx, "start log streaming loop", slog.F("last_sent_log_id", lastSentLogID)) keepGoing := true for keepGoing { var ( debugTriggeredBy string onlyCheckLatestBuild bool ) select { case <-ctx.Done(): return case <-t.C: debugTriggeredBy = "timer" case <-workspaceNotifyCh: debugTriggeredBy = "workspace" onlyCheckLatestBuild = true case <-notifyCh: debugTriggeredBy = "log" t.Reset(recheckInterval) } agents, err := api.Database.GetWorkspaceAgentsInLatestBuildByWorkspaceID(ctx, waws.WorkspaceTable.ID) if err != nil && !xerrors.Is(err, sql.ErrNoRows) { if xerrors.Is(err, context.Canceled) { return } logger.Warn(ctx, "failed to get workspace agents in latest build", slog.Error(err)) continue } // If the agent is no longer in the latest build, we can stop after // checking once. keepGoing = slices.ContainsFunc(agents, func(agent database.WorkspaceAgent) bool { return agent.ID == waws.WorkspaceAgent.ID }) logger.Debug( ctx, "checking for new logs", slog.F("triggered_by", debugTriggeredBy), slog.F("only_check_latest_build", onlyCheckLatestBuild), slog.F("keep_going", keepGoing), slog.F("last_sent_log_id", lastSentLogID), slog.F("workspace_has_agents", len(agents) > 0), ) if onlyCheckLatestBuild && keepGoing { continue } logs, err := api.Database.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{ AgentID: waws.WorkspaceAgent.ID, CreatedAfter: lastSentLogID, }) if err != nil { if xerrors.Is(err, context.Canceled) { return } logger.Warn(ctx, "failed to get workspace agent logs after", slog.Error(err)) continue } if len(logs) == 0 { // Just keep listening - more logs might come in the future! continue } select { case <-ctx.Done(): return case bufferedLogs <- logs: lastSentLogID = logs[len(logs)-1].ID } } }() defer func() { // Ensure that we don't return until the goroutine has exited. //nolint:revive // Consume channel to wait until it's closed. for range bufferedLogs { } }() for { select { case <-ctx.Done(): logger.Debug(ctx, "job logs context canceled") return case logs, ok := <-bufferedLogs: if !ok { select { case <-ctx.Done(): logger.Debug(ctx, "job logs context canceled") default: logger.Debug(ctx, "reached the end of published logs") } return } err = encoder.Encode(convertWorkspaceAgentLogs(logs)) if err != nil { return } } } } // @Summary Get listening ports for workspace agent // @ID get-listening-ports-for-workspace-agent // @Security CoderSessionToken // @Produce json // @Tags Agents // @Param workspaceagent path string true "Workspace agent ID" format(uuid) // @Success 200 {object} codersdk.WorkspaceAgentListeningPortsResponse // @Router /api/v2/workspaceagents/{workspaceagent}/listening-ports [get] func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() waws := httpmw.WorkspaceAgentAndWorkspaceParam(r) // If the agent is unreachable, the request will hang. Assume that if we // don't get a response after 30s that the agent is unreachable. ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() apiAgent, err := db2sdk.WorkspaceAgent( api.DERPMap(), *api.TailnetCoordinator.Load(), waws.WorkspaceAgent, nil, nil, nil, api.AgentInactiveDisconnectTimeout, api.DeploymentValues.AgentFallbackTroubleshootingURL.String(), ) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error reading workspace agent.", Detail: err.Error(), }) return } if apiAgent.Status != codersdk.WorkspaceAgentConnected { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected), }) return } agentConn, release, err := api.agentProvider.AgentConn(ctx, waws.WorkspaceAgent.ID) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error dialing workspace agent.", Detail: err.Error(), }) return } defer release() portsResponse, err := agentConn.ListeningPorts(ctx) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error fetching listening ports.", Detail: err.Error(), }) return } // Get a list of ports that are in-use by applications. apps, err := api.Database.GetWorkspaceAppsByAgentID(ctx, waws.WorkspaceAgent.ID) if xerrors.Is(err, sql.ErrNoRows) { apps = []database.WorkspaceApp{} err = nil } if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error fetching workspace apps.", Detail: err.Error(), }) return } appPorts := make(map[uint16]struct{}, len(apps)) for _, app := range apps { if !app.Url.Valid || app.Url.String == "" { continue } u, err := url.Parse(app.Url.String) if err != nil { continue } port := u.Port() if port == "" { continue } portNum, err := strconv.ParseUint(port, 10, 16) if err != nil { continue } if portNum < 1 || portNum > 65535 { continue } appPorts[uint16(portNum)] = struct{}{} } // Filter out ports that are globally blocked, in-use by applications, or // common non-HTTP ports such as databases, FTP, SSH, etc. filteredPorts := make([]codersdk.WorkspaceAgentListeningPort, 0, len(portsResponse.Ports)) for _, port := range portsResponse.Ports { if port.Port < workspacesdk.AgentMinimumListeningPort { continue } if _, ok := appPorts[port.Port]; ok { continue } if _, ok := workspacesdk.AgentIgnoredListeningPorts[port.Port]; ok { continue } filteredPorts = append(filteredPorts, port) } portsResponse.Ports = filteredPorts httpapi.Write(ctx, rw, http.StatusOK, portsResponse) } // @Summary Watch workspace agent for container updates. // @ID watch-workspace-agent-for-container-updates // @Security CoderSessionToken // @Produce json // @Tags Agents // @Param workspaceagent path string true "Workspace agent ID" format(uuid) // @Success 200 {object} codersdk.WorkspaceAgentListContainersResponse // @Router /api/v2/workspaceagents/{workspaceagent}/containers/watch [get] func (api *API) watchWorkspaceAgentContainers(rw http.ResponseWriter, r *http.Request) { var ( ctx = r.Context() waws = httpmw.WorkspaceAgentAndWorkspaceParam(r) logger = api.Logger.Named("agent_container_watcher").With(slog.F("agent_id", waws.WorkspaceAgent.ID)) ) // If the agent is unreachable, the request will hang. Assume that if we // don't get a response after 30s that the agent is unreachable. dialCtx, dialCancel := context.WithTimeout(ctx, 30*time.Second) defer dialCancel() apiAgent, err := db2sdk.WorkspaceAgent( api.DERPMap(), *api.TailnetCoordinator.Load(), waws.WorkspaceAgent, nil, nil, nil, api.AgentInactiveDisconnectTimeout, api.DeploymentValues.AgentFallbackTroubleshootingURL.String(), ) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error reading workspace agent.", Detail: err.Error(), }) return } if apiAgent.Status != codersdk.WorkspaceAgentConnected { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected), }) return } agentConn, release, err := api.agentProvider.AgentConn(dialCtx, waws.WorkspaceAgent.ID) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error dialing workspace agent.", Detail: err.Error(), }) return } defer release() containersCh, closer, err := agentConn.WatchContainers(ctx, logger) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error watching agent's containers.", Detail: err.Error(), }) return } defer closer.Close() conn, err := websocket.Accept(rw, r, nil) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to upgrade connection to websocket.", Detail: err.Error(), }) return } ctx, cancel := context.WithCancel(r.Context()) defer cancel() // Here we close the websocket for reading, so that the websocket library will handle pings and // close frames. _ = conn.CloseRead(context.Background()) ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageText) defer wsNetConn.Close() go httpapi.HeartbeatCloseWithClock(ctx, logger, cancel, conn, api.Clock) encoder := json.NewEncoder(wsNetConn) for { select { case <-api.ctx.Done(): return case <-ctx.Done(): return case containers, ok := <-containersCh: if !ok { return } if err := encoder.Encode(containers); err != nil { api.Logger.Error(ctx, "encode containers", slog.Error(err)) return } } } } // @Summary Get running containers for workspace agent // @ID get-running-containers-for-workspace-agent // @Security CoderSessionToken // @Produce json // @Tags Agents // @Param workspaceagent path string true "Workspace agent ID" format(uuid) // @Param label query string true "Labels" format(key=value) // @Success 200 {object} codersdk.WorkspaceAgentListContainersResponse // @Router /api/v2/workspaceagents/{workspaceagent}/containers [get] func (api *API) workspaceAgentListContainers(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() waws := httpmw.WorkspaceAgentAndWorkspaceParam(r) labelParam, ok := r.URL.Query()["label"] if !ok { labelParam = []string{} } labels := make(map[string]string, len(labelParam)/2) for _, label := range labelParam { kvs := strings.Split(label, "=") if len(kvs) != 2 { httpapi.Write(r.Context(), rw, http.StatusBadRequest, codersdk.Response{ Message: "Invalid label format", Detail: "Labels must be in the format key=value", }) return } labels[kvs[0]] = kvs[1] } // If the agent is unreachable, the request will hang. Assume that if we // don't get a response after 30s that the agent is unreachable. ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() apiAgent, err := db2sdk.WorkspaceAgent( api.DERPMap(), *api.TailnetCoordinator.Load(), waws.WorkspaceAgent, nil, nil, nil, api.AgentInactiveDisconnectTimeout, api.DeploymentValues.AgentFallbackTroubleshootingURL.String(), ) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error reading workspace agent.", Detail: err.Error(), }) return } if apiAgent.Status != codersdk.WorkspaceAgentConnected { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected), }) return } agentConn, release, err := api.agentProvider.AgentConn(ctx, waws.WorkspaceAgent.ID) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error dialing workspace agent.", Detail: err.Error(), }) return } defer release() // Get a list of containers that the agent is able to detect cts, err := agentConn.ListContainers(ctx) if err != nil { if errors.Is(err, context.Canceled) { httpapi.Write(ctx, rw, http.StatusRequestTimeout, codersdk.Response{ Message: "Failed to fetch containers from agent.", Detail: "Request timed out.", }) return } // If the agent returns a codersdk.Error, we can return that directly. if cerr, ok := codersdk.AsError(err); ok { httpapi.Write(ctx, rw, cerr.StatusCode(), cerr.Response) return } httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error fetching containers.", Detail: err.Error(), }) return } // Filter in-place by labels cts.Containers = slices.DeleteFunc(cts.Containers, func(ct codersdk.WorkspaceAgentContainer) bool { return !maputil.Subset(labels, ct.Labels) }) httpapi.Write(ctx, rw, http.StatusOK, cts) } // @Summary Delete devcontainer for workspace agent // @ID delete-devcontainer-for-workspace-agent // @Security CoderSessionToken // @Tags Agents // @Param workspaceagent path string true "Workspace agent ID" format(uuid) // @Param devcontainer path string true "Devcontainer ID" // @Success 204 // @Router /api/v2/workspaceagents/{workspaceagent}/containers/devcontainers/{devcontainer} [delete] func (api *API) workspaceAgentDeleteDevcontainer(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() waws := httpmw.WorkspaceAgentAndWorkspaceParam(r) if !api.Authorize(r, policy.ActionUpdate, waws.WorkspaceTable) { httpapi.Forbidden(rw) return } devcontainer := chi.URLParam(r, "devcontainer") if devcontainer == "" { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Devcontainer ID is required.", Validations: []codersdk.ValidationError{ {Field: "devcontainer", Detail: "Devcontainer ID is required."}, }, }) return } apiAgent, err := db2sdk.WorkspaceAgent( api.DERPMap(), *api.TailnetCoordinator.Load(), waws.WorkspaceAgent, nil, nil, nil, api.AgentInactiveDisconnectTimeout, api.DeploymentValues.AgentFallbackTroubleshootingURL.String(), ) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error reading workspace agent.", Detail: err.Error(), }) return } if apiAgent.Status != codersdk.WorkspaceAgentConnected { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected), }) return } // If the agent is unreachable, the request will hang. Assume that if we // don't get a response after 30s that the agent is unreachable. dialCtx, dialCancel := context.WithTimeout(ctx, 30*time.Second) defer dialCancel() agentConn, release, err := api.agentProvider.AgentConn(dialCtx, waws.WorkspaceAgent.ID) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error dialing workspace agent.", Detail: err.Error(), }) return } defer release() if err = agentConn.DeleteDevcontainer(ctx, devcontainer); err != nil { if errors.Is(err, context.Canceled) { httpapi.Write(ctx, rw, http.StatusRequestTimeout, codersdk.Response{ Message: "Failed to delete devcontainer from agent.", Detail: "Request timed out.", }) return } // If the agent returns a codersdk.Error, we can return that directly. if cerr, ok := codersdk.AsError(err); ok { httpapi.Write(ctx, rw, cerr.StatusCode(), cerr.Response) return } httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error deleting devcontainer.", Detail: err.Error(), }) return } httpapi.Write(ctx, rw, http.StatusNoContent, nil) } // @Summary Recreate devcontainer for workspace agent // @ID recreate-devcontainer-for-workspace-agent // @Security CoderSessionToken // @Tags Agents // @Produce json // @Param workspaceagent path string true "Workspace agent ID" format(uuid) // @Param devcontainer path string true "Devcontainer ID" // @Success 202 {object} codersdk.Response // @Router /api/v2/workspaceagents/{workspaceagent}/containers/devcontainers/{devcontainer}/recreate [post] func (api *API) workspaceAgentRecreateDevcontainer(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() waws := httpmw.WorkspaceAgentAndWorkspaceParam(r) if !api.Authorize(r, policy.ActionUpdate, waws.WorkspaceTable) { httpapi.Forbidden(rw) return } devcontainer := chi.URLParam(r, "devcontainer") if devcontainer == "" { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Devcontainer ID is required.", Validations: []codersdk.ValidationError{ {Field: "devcontainer", Detail: "Devcontainer ID is required."}, }, }) return } apiAgent, err := db2sdk.WorkspaceAgent( api.DERPMap(), *api.TailnetCoordinator.Load(), waws.WorkspaceAgent, nil, nil, nil, api.AgentInactiveDisconnectTimeout, api.DeploymentValues.AgentFallbackTroubleshootingURL.String(), ) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error reading workspace agent.", Detail: err.Error(), }) return } if apiAgent.Status != codersdk.WorkspaceAgentConnected { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected), }) return } // If the agent is unreachable, the request will hang. Assume that if we // don't get a response after 30s that the agent is unreachable. dialCtx, dialCancel := context.WithTimeout(ctx, 30*time.Second) defer dialCancel() agentConn, release, err := api.agentProvider.AgentConn(dialCtx, waws.WorkspaceAgent.ID) if err != nil { httpapi.Write(dialCtx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error dialing workspace agent.", Detail: err.Error(), }) return } defer release() m, err := agentConn.RecreateDevcontainer(ctx, devcontainer) if err != nil { if errors.Is(err, context.Canceled) { httpapi.Write(ctx, rw, http.StatusRequestTimeout, codersdk.Response{ Message: "Failed to recreate devcontainer from agent.", Detail: "Request timed out.", }) return } // If the agent returns a codersdk.Error, we can return that directly. if cerr, ok := codersdk.AsError(err); ok { httpapi.Write(ctx, rw, cerr.StatusCode(), cerr.Response) return } httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error recreating devcontainer.", Detail: err.Error(), }) return } httpapi.Write(ctx, rw, http.StatusAccepted, m) } // @Summary Get connection info for workspace agent // @ID get-connection-info-for-workspace-agent // @Security CoderSessionToken // @Produce json // @Tags Agents // @Param workspaceagent path string true "Workspace agent ID" format(uuid) // @Success 200 {object} workspacesdk.AgentConnectionInfo // @Router /api/v2/workspaceagents/{workspaceagent}/connection [get] func (api *API) workspaceAgentConnection(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() httpapi.Write(ctx, rw, http.StatusOK, workspacesdk.AgentConnectionInfo{ DERPMap: api.DERPMap(), DERPForceWebSockets: api.DeploymentValues.DERP.Config.ForceWebSockets.Value(), DisableDirectConnections: api.DeploymentValues.DERP.Config.BlockDirect.Value(), HostnameSuffix: api.DeploymentValues.WorkspaceHostnameSuffix.Value(), }) } // workspaceAgentConnectionGeneric is the same as workspaceAgentConnection but // without the workspaceagent path parameter. // // @Summary Get connection info for workspace agent generic // @ID get-connection-info-for-workspace-agent-generic // @Security CoderSessionToken // @Produce json // @Tags Agents // @Success 200 {object} workspacesdk.AgentConnectionInfo // @Router /api/v2/workspaceagents/connection [get] // @x-apidocgen {"skip": true} func (api *API) workspaceAgentConnectionGeneric(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() httpapi.Write(ctx, rw, http.StatusOK, workspacesdk.AgentConnectionInfo{ DERPMap: api.DERPMap(), DERPForceWebSockets: api.DeploymentValues.DERP.Config.ForceWebSockets.Value(), DisableDirectConnections: api.DeploymentValues.DERP.Config.BlockDirect.Value(), HostnameSuffix: api.DeploymentValues.WorkspaceHostnameSuffix.Value(), }) } // @Summary Get DERP map updates // @ID get-derp-map-updates // @Security CoderSessionToken // @Tags Agents // @Success 101 // @Router /api/v2/derp-map [get] func (api *API) derpMapUpdates(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() api.WebsocketWaitMutex.Lock() api.WebsocketWaitGroup.Add(1) api.WebsocketWaitMutex.Unlock() defer api.WebsocketWaitGroup.Done() ws, err := websocket.Accept(rw, r, nil) if err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Failed to accept websocket.", Detail: err.Error(), }) return } encoder := wsjson.NewEncoder[*tailcfg.DERPMap](ws, websocket.MessageBinary) defer encoder.Close(websocket.StatusGoingAway) // Log the request immediately instead of after it completes. if rl := loggermw.RequestLoggerFromContext(ctx); rl != nil { rl.WriteLog(ctx, http.StatusAccepted) } go func(ctx context.Context) { // TODO(mafredri): Is this too frequent? Use separate ping disconnect timeout? t := time.NewTicker(api.AgentConnectionUpdateFrequency) defer t.Stop() for { select { case <-t.C: case <-ctx.Done(): return } ctx, cancel := context.WithTimeout(ctx, 30*time.Second) err := ws.Ping(ctx) cancel() if err != nil { _ = ws.Close(websocket.StatusGoingAway, "ping failed") return } } }(ctx) ticker := time.NewTicker(api.Options.DERPMapUpdateFrequency) defer ticker.Stop() var lastDERPMap *tailcfg.DERPMap for { derpMap := api.DERPMap() if lastDERPMap == nil || !tailnet.CompareDERPMaps(lastDERPMap, derpMap) { err := encoder.Encode(derpMap) if err != nil { return } lastDERPMap = derpMap } ticker.Reset(api.Options.DERPMapUpdateFrequency) select { case <-ctx.Done(): return case <-api.ctx.Done(): return case <-ticker.C: } } } // workspaceAgentClientCoordinate accepts a WebSocket that reads node network updates. // After accept a PubSub starts listening for new connection node updates // which are written to the WebSocket. // // @Summary Coordinate workspace agent // @ID coordinate-workspace-agent // @Security CoderSessionToken // @Tags Agents // @Param workspaceagent path string true "Workspace agent ID" format(uuid) // @Success 101 // @Router /api/v2/workspaceagents/{workspaceagent}/coordinate [get] func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() // Ensure the database is reachable before proceeding. _, err := api.Database.Ping(ctx) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: codersdk.DatabaseNotReachable, Detail: err.Error(), }) return } // This route accepts user API key auth and workspace proxy auth. The moon actor has // full permissions so should be able to pass this authz check. waws := httpmw.WorkspaceAgentAndWorkspaceParam(r) if !api.Authorize(r, policy.ActionSSH, waws) { httpapi.ResourceNotFound(rw) return } // This is used by Enterprise code to control the functionality of this route. // Namely, disabling the route using `CODER_BROWSER_ONLY`. override := api.WorkspaceClientCoordinateOverride.Load() if override != nil { overrideFunc := *override if overrideFunc != nil && overrideFunc(rw) { return } } version := "1.0" qv := r.URL.Query().Get("version") if qv != "" { version = qv } if err := proto.CurrentVersion.Validate(version); err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Unknown or unsupported API version", Validations: []codersdk.ValidationError{ {Field: "version", Detail: err.Error()}, }, }) return } peerID, err := api.handleResumeToken(ctx, rw, r) if err != nil { // handleResumeToken has already written the response. return } api.WebsocketWaitMutex.Lock() api.WebsocketWaitGroup.Add(1) api.WebsocketWaitMutex.Unlock() defer api.WebsocketWaitGroup.Done() conn, err := websocket.Accept(rw, r, nil) if err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Failed to accept websocket.", Detail: err.Error(), }) return } ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary) defer wsNetConn.Close() ctx, cancel := context.WithCancel(ctx) defer cancel() go httpapi.HeartbeatClose(ctx, api.Logger, cancel, conn) defer conn.Close(websocket.StatusNormalClosure, "") err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, tailnet.StreamID{ Name: "client", ID: peerID, Auth: tailnet.ClientCoordinateeAuth{ AgentID: waws.WorkspaceAgent.ID, }, }) if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) { _ = conn.Close(websocket.StatusInternalError, err.Error()) return } } // handleResumeToken accepts a resume_token query parameter to use the same peer ID func (api *API) handleResumeToken(ctx context.Context, rw http.ResponseWriter, r *http.Request) (peerID uuid.UUID, err error) { peerID = uuid.New() resumeToken := r.URL.Query().Get("resume_token") if resumeToken != "" { peerID, err = api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(ctx, resumeToken) // If the token is missing the key ID, it's probably an old token in which // case we just want to generate a new peer ID. switch { case xerrors.Is(err, jwtutils.ErrMissingKeyID): peerID = uuid.New() err = nil case err != nil: httpapi.Write(ctx, rw, http.StatusUnauthorized, codersdk.Response{ Message: workspacesdk.CoordinateAPIInvalidResumeToken, Detail: err.Error(), Validations: []codersdk.ValidationError{ {Field: "resume_token", Detail: workspacesdk.CoordinateAPIInvalidResumeToken}, }, }) return peerID, err default: api.Logger.Debug(ctx, "accepted coordinate resume token for peer", slog.F("peer_id", peerID.String())) } } return peerID, err } // @Summary Post workspace agent log source // @ID post-workspace-agent-log-source // @Security CoderSessionToken // @Accept json // @Produce json // @Tags Agents // @Param request body agentsdk.PostLogSourceRequest true "Log source request" // @Success 200 {object} codersdk.WorkspaceAgentLogSource // @Router /api/v2/workspaceagents/me/log-source [post] func (api *API) workspaceAgentPostLogSource(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() var req agentsdk.PostLogSourceRequest if !httpapi.Read(ctx, rw, r, &req) { return } workspaceAgent := httpmw.WorkspaceAgent(r) sources, err := api.Database.InsertWorkspaceAgentLogSources(ctx, database.InsertWorkspaceAgentLogSourcesParams{ WorkspaceAgentID: workspaceAgent.ID, CreatedAt: dbtime.Now(), ID: []uuid.UUID{req.ID}, DisplayName: []string{req.DisplayName}, Icon: []string{req.Icon}, }) if err != nil { if database.IsUniqueViolation(err, "workspace_agent_log_sources_pkey") { httpapi.Write(ctx, rw, http.StatusCreated, codersdk.WorkspaceAgentLogSource{ WorkspaceAgentID: workspaceAgent.ID, CreatedAt: dbtime.Now(), ID: req.ID, DisplayName: req.DisplayName, Icon: req.Icon, }) return } httpapi.InternalServerError(rw, err) return } if len(sources) != 1 { httpapi.InternalServerError(rw, xerrors.Errorf("database should've returned 1 row, got %d", len(sources))) return } apiSource := convertLogSources(sources)[0] httpapi.Write(ctx, rw, http.StatusCreated, apiSource) } // @Summary Get workspace agent reinitialization // @ID get-workspace-agent-reinitialization // @Security CoderSessionToken // @Produce json // @Tags Agents // @Param wait query bool false "Opt in to durable reinit checks" // @Success 200 {object} agentsdk.ReinitializationEvent // @Failure 409 {object} codersdk.Response // @Router /api/v2/workspaceagents/me/reinit [get] func (api *API) workspaceAgentReinit(rw http.ResponseWriter, r *http.Request) { // Allow us to interrupt watch via cancel. ctx, cancel := context.WithCancel(r.Context()) defer cancel() r = r.WithContext(ctx) // Rewire context for SSE cancellation. workspaceAgent := httpmw.WorkspaceAgent(r) log := api.Logger.Named("workspace_agent_reinit_watcher").With( slog.F("workspace_agent_id", workspaceAgent.ID), ) workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID) if err != nil { log.Error(ctx, "failed to retrieve workspace from agent token", slog.Error(err)) httpapi.InternalServerError(rw, xerrors.New("failed to determine workspace from agent token")) return } log = log.With(slog.F("workspace_id", workspace.ID)) log.Info(ctx, "agent waiting for reinit instruction") // Subscribe to claim events BEFORE any durable checks to avoid a // TOCTOU race: without this, a claim could fire between the // IsPrebuild() check and the subscribe call, and we'd miss the // pubsub event entirely. By subscribing first, any event that // fires during the checks below is buffered in the channel. pubsubCh, cancelSub, err := prebuilds.NewPubsubWorkspaceClaimListener(api.Pubsub, log).ListenForWorkspaceClaims(ctx, workspace.ID) if err != nil { log.Error(ctx, "subscribe to prebuild claimed channel", slog.Error(err)) httpapi.InternalServerError(rw, xerrors.New("failed to subscribe to prebuild claimed channel")) return } defer cancelSub() reinitEvents := pubsubCh // Only perform the durable claim check when the agent opts in via // the "wait" query parameter. Older agents don't send the // "wait" query parameter and lack the duplicate-reinit guard, so // they would enter an infinite reinit loop if we pre-seeded the // channel on every connection. waitParam, _ := strconv.ParseBool(r.URL.Query().Get("wait")) if waitParam && !workspace.IsPrebuild() { firstBuild, err := api.Database.GetWorkspaceBuildByWorkspaceIDAndBuildNumber(ctx, database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams{ WorkspaceID: workspace.ID, BuildNumber: 1, }) if err != nil { log.Error(ctx, "failed to get first workspace build", slog.Error(err)) httpapi.InternalServerError(rw, xerrors.New("failed to get first workspace build")) return } if firstBuild.InitiatorID != database.PrebuildsSystemUserID { // Not a claimed prebuild — this is a regular workspace. // Return 409 so the agent stops reconnecting to this // endpoint. httpapi.Write(ctx, rw, http.StatusConflict, codersdk.Response{ Message: "Workspace is not a prebuilt workspace waiting to be claimed.", Detail: "This endpoint is only for agents running in prebuilt workspaces.", }) return } // This workspace was a prebuild that got claimed. Check if // the claim build completed successfully before sending // reinit. We assume the latest build is the claim build // (build 2). If a third build (e.g. a restart) starts // between the claim and the agent's reconnection, this // would check that build instead. The window is extremely // small in practice, and a restart would trigger its own // reinit path. latestBuild, err := api.Database.GetLatestWorkspaceBuildByWorkspaceID(ctx, workspace.ID) if err != nil { log.Error(ctx, "failed to get latest workspace build", slog.Error(err)) httpapi.InternalServerError(rw, xerrors.New("failed to get latest workspace build")) return } job, err := api.Database.GetProvisionerJobByID(ctx, latestBuild.JobID) if err != nil { log.Error(ctx, "failed to get provisioner job", slog.Error(err)) httpapi.InternalServerError(rw, xerrors.New("failed to get provisioner job")) return } if job.CompletedAt.Valid && !job.Error.Valid { // Claim build succeeded — cancel the pubsub // subscription (no longer needed) and swap in a // pre-seeded channel so the transmitter delivers // exactly one reinit event. cancelSub() seeded := make(chan agentsdk.ReinitializationEvent, 1) seeded <- agentsdk.ReinitializationEvent{ WorkspaceID: workspace.ID, Reason: agentsdk.ReinitializeReasonPrebuildClaimed, OwnerID: workspace.OwnerID, } reinitEvents = seeded } else if job.CompletedAt.Valid && job.Error.Valid { // Claim build failed permanently. Return 409 so the // agent treats this as terminal and stops retrying // (WaitForReinitLoop exits on any 409). cancelSub() log.Warn(ctx, "claim build failed", slog.F("job_id", job.ID), slog.F("error", job.Error.String)) httpapi.Write(ctx, rw, http.StatusConflict, codersdk.Response{ Message: "Claim build failed permanently.", Detail: job.Error.String, }) return } // Claim build still in progress — fall through to the // transmitter. The pubsub subscription (set up above) // will deliver the event when the build completes // successfully. Note: FailJob does not publish a claim // event, so a failed in-progress build will leave the // agent blocking here until it disconnects and // reconnects (at which point the durable check above // handles it). } transmitter := agentsdk.NewSSEAgentReinitTransmitter(log, rw, r) err = transmitter.Transmit(ctx, reinitEvents) switch { case errors.Is(err, agentsdk.ErrTransmissionSourceClosed): log.Info(ctx, "agent reinitialization subscription closed", slog.F("workspace_agent_id", workspaceAgent.ID)) case errors.Is(err, agentsdk.ErrTransmissionTargetClosed): log.Info(ctx, "agent connection closed", slog.F("workspace_agent_id", workspaceAgent.ID)) case errors.Is(err, context.Canceled): log.Info(ctx, "agent reinitialization", slog.Error(err)) case err != nil: log.Error(ctx, "failed to stream agent reinit events", slog.Error(err)) httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error streaming agent reinitialization events.", Detail: err.Error(), }) } } // convertProvisionedApps converts applications that are in the middle of provisioning process. // It means that they may not have an agent or workspace assigned (dry-run job). func convertProvisionedApps(dbApps []database.WorkspaceApp) []codersdk.WorkspaceApp { return db2sdk.Apps(dbApps, []database.WorkspaceAppStatus{}, database.WorkspaceAgent{}, "", database.WorkspaceTable{}) } func convertLogSources(dbLogSources []database.WorkspaceAgentLogSource) []codersdk.WorkspaceAgentLogSource { logSources := make([]codersdk.WorkspaceAgentLogSource, 0) for _, dbLogSource := range dbLogSources { logSources = append(logSources, codersdk.WorkspaceAgentLogSource{ ID: dbLogSource.ID, DisplayName: dbLogSource.DisplayName, WorkspaceAgentID: dbLogSource.WorkspaceAgentID, CreatedAt: dbLogSource.CreatedAt, Icon: dbLogSource.Icon, }) } return logSources } func convertScripts(dbScripts []database.GetWorkspaceAgentScriptsByAgentIDsRow) []codersdk.WorkspaceAgentScript { scripts := make([]codersdk.WorkspaceAgentScript, 0) for _, dbScript := range dbScripts { scripts = append(scripts, db2sdk.WorkspaceAgentScript(dbScript)) } return scripts } // @Summary Watch for workspace agent metadata updates // @ID watch-for-workspace-agent-metadata-updates // @Security CoderSessionToken // @Tags Agents // @Success 200 "Success" // @Param workspaceagent path string true "Workspace agent ID" format(uuid) // @Router /api/v2/workspaceagents/{workspaceagent}/watch-metadata [get] // @x-apidocgen {"skip": true} // @Deprecated Use /workspaceagents/{workspaceagent}/watch-metadata-ws instead func (api *API) watchWorkspaceAgentMetadataSSE(rw http.ResponseWriter, r *http.Request) { api.watchWorkspaceAgentMetadata(rw, r, httpapi.ServerSentEventSender) } // @Summary Watch for workspace agent metadata updates via WebSockets // @ID watch-for-workspace-agent-metadata-updates-via-websockets // @Security CoderSessionToken // @Produce json // @Tags Agents // @Success 200 {object} codersdk.ServerSentEvent // @Param workspaceagent path string true "Workspace agent ID" format(uuid) // @Router /api/v2/workspaceagents/{workspaceagent}/watch-metadata-ws [get] // @x-apidocgen {"skip": true} func (api *API) watchWorkspaceAgentMetadataWS(rw http.ResponseWriter, r *http.Request) { api.watchWorkspaceAgentMetadata(rw, r, httpapi.OneWayWebSocketEventSender(api.Logger)) } func (api *API) watchWorkspaceAgentMetadata( rw http.ResponseWriter, r *http.Request, connect httpapi.EventSender, ) { // Allow us to interrupt watch via cancel. ctx, cancel := context.WithCancel(r.Context()) defer cancel() r = r.WithContext(ctx) // Rewire context for SSE cancellation. waws := httpmw.WorkspaceAgentAndWorkspaceParam(r) agentIDEncoded := make([]byte, metadatabatcher.UUIDBase64Size) err := metadatabatcher.EncodeAgentID(waws.WorkspaceAgent.ID, agentIDEncoded) if err != nil { httpapi.InternalServerError(rw, err) return } log := api.Logger.Named("workspace_metadata_watcher").With( slog.F("workspace_agent_id", waws.WorkspaceAgent.ID), slog.F("workspace_id", waws.WorkspaceTable.ID), ) // Send metadata on updates, we must ensure subscription before sending // initial metadata to guarantee that events in-between are not missed. // The channel carries no data - it's just a signal to fetch all metadata. update := make(chan struct{}, 1) // Subscribe to the global batched metadata channel. // The batcher publishes only to this channel to achieve O(1) NOTIFY scaling. cancelBatchSub, err := api.Pubsub.Subscribe(metadatabatcher.MetadataBatchPubsubChannel, func(_ context.Context, byt []byte) { if ctx.Err() != nil { return } if len(byt)%metadatabatcher.UUIDBase64Size != 0 { log.Error(ctx, "invalid batched pubsub message, pubsub message length was not a multiple of encoded agent UUID length", slog.Error(err)) return } // Compare each encoded agentID to our encoded agent ID. for i := 0; i < len(byt); i += metadatabatcher.UUIDBase64Size { if !bytes.Equal(byt[i:i+metadatabatcher.UUIDBase64Size], agentIDEncoded) { continue } log.Debug(ctx, "received metadata update from batch channel", slog.F("agent_id", waws.WorkspaceAgent.ID), slog.F("batch_size", len(byt)/metadatabatcher.UUIDBase64Size), ) // Signal to re-fetch all metadata for this agent. // Batch notifications don't include which keys changed, so we // always fetch all keys for this agent. // Attempt to read from the channel first so that we do not block on the write. select { case <-update: default: } update <- struct{}{} break } }) if err != nil { httpapi.InternalServerError(rw, err) return } defer cancelBatchSub() // We always use the original Request context because it contains // the RBAC actor. initialMD, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{ WorkspaceAgentID: waws.WorkspaceAgent.ID, Keys: nil, }) if err != nil { // If we can't successfully pull the initial metadata, pubsub // updates will be no-op so we may as well terminate the // connection early. httpapi.InternalServerError(rw, err) return } log.Debug(ctx, "got initial metadata", slog.F("num", len(initialMD))) metadataMap := make(map[string]database.WorkspaceAgentMetadatum, len(initialMD)) for _, datum := range initialMD { metadataMap[datum.Key] = datum } //nolint:ineffassign // Release memory. initialMD = nil sendEvent, senderClosed, err := connect(rw, r) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error setting up server-sent events.", Detail: err.Error(), }) return } // Prevent handler from returning until the sender is closed. defer func() { cancel() <-senderClosed }() // Synchronize cancellation from SSE -> context, this lets us simplify the // cancellation logic. go func() { select { case <-ctx.Done(): case <-senderClosed: cancel() } }() var lastSend time.Time sendMetadata := func() { lastSend = time.Now() values := maps.Values(metadataMap) log.Debug(ctx, "sending metadata", slog.F("num", len(values))) _ = sendEvent(codersdk.ServerSentEvent{ Type: codersdk.ServerSentEventTypeData, Data: convertWorkspaceAgentMetadata(values), }) } // We send updates exactly every second. const sendInterval = time.Second * 1 sendTicker := time.NewTicker(sendInterval) defer sendTicker.Stop() // Log the request immediately instead of after it completes. if rl := loggermw.RequestLoggerFromContext(ctx); rl != nil { rl.WriteLog(ctx, http.StatusAccepted) } // Send initial metadata. sendMetadata() // Fetch updated metadata keys as they come in. fetchedMetadata := make(chan []database.WorkspaceAgentMetadatum) go func() { defer close(fetchedMetadata) defer cancel() for { select { case <-ctx.Done(): return case <-update: // Batch notification received - fetch all metadata for this agent. md, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{ WorkspaceAgentID: waws.WorkspaceAgent.ID, Keys: nil, // nil means fetch all keys }) if err != nil { if !database.IsQueryCanceledError(err) { log.Error(ctx, "failed to get metadata", slog.Error(err)) _ = sendEvent(codersdk.ServerSentEvent{ Type: codersdk.ServerSentEventTypeError, Data: codersdk.Response{ Message: "Failed to get metadata.", Detail: err.Error(), }, }) } return } select { case <-ctx.Done(): return // We want to block here to avoid constantly pinging the // database when the metadata isn't being processed. case fetchedMetadata <- md: log.Debug(ctx, "fetched all metadata after batch update", slog.F("num", len(md))) } } } }() defer func() { <-fetchedMetadata }() pendingChanges := true for { select { case <-ctx.Done(): return case md, ok := <-fetchedMetadata: if !ok { return } for _, datum := range md { metadataMap[datum.Key] = datum } pendingChanges = true continue case <-sendTicker.C: // We send an update even if there's no change every 5 seconds // to ensure that the frontend always has an accurate "Result.Age". if !pendingChanges && time.Since(lastSend) < 5*time.Second { continue } pendingChanges = false } sendMetadata() } } func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []codersdk.WorkspaceAgentMetadata { // Sort the input database slice by DisplayOrder and then by Key before processing sort.Slice(db, func(i, j int) bool { if db[i].DisplayOrder == db[j].DisplayOrder { return db[i].Key < db[j].Key } return db[i].DisplayOrder < db[j].DisplayOrder }) // An empty array is easier for clients to handle than a null. result := make([]codersdk.WorkspaceAgentMetadata, len(db)) for i, datum := range db { result[i] = codersdk.WorkspaceAgentMetadata{ Result: codersdk.WorkspaceAgentMetadataResult{ Value: datum.Value, Error: datum.Error, CollectedAt: datum.CollectedAt.UTC(), Age: int64(time.Since(datum.CollectedAt).Seconds()), }, Description: codersdk.WorkspaceAgentMetadataDescription{ DisplayName: datum.DisplayName, Key: datum.Key, Script: datum.Script, Interval: datum.Interval, Timeout: datum.Timeout, }, } } return result } // workspaceAgentsExternalAuth returns an access token for a given URL // or finds a provider by ID. // // @Summary Get workspace agent external auth // @ID get-workspace-agent-external-auth // @Security CoderSessionToken // @Produce json // @Tags Agents // @Param match query string true "Match" // @Param id query string true "Provider ID" // @Param listen query bool false "Wait for a new token to be issued" // @Success 200 {object} agentsdk.ExternalAuthResponse // @Router /api/v2/workspaceagents/me/external-auth [get] func (api *API) workspaceAgentsExternalAuth(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() query := r.URL.Query() gitRef := chatGitRef{ Branch: strings.TrimSpace(query.Get("git_branch")), RemoteOrigin: strings.TrimSpace(query.Get("git_remote_origin")), } if raw := strings.TrimSpace(query.Get("chat_id")); raw != "" { if parsed, err := uuid.Parse(raw); err == nil { gitRef.ChatID = parsed } } // Either match or configID must be provided! match := query.Get("match") if match == "" { // Support legacy agents! match = query.Get("url") } id := query.Get("id") if match == "" && id == "" { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "'url' or 'id' must be provided!", }) return } if match != "" && id != "" { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "'url' and 'id' cannot be provided together!", }) return } // listen determines if the request will wait for a // new token to be issued! listen := query.Has("listen") var externalAuthConfig *externalauth.Config for _, extAuth := range api.ExternalAuthConfigs { if extAuth.ID == id { externalAuthConfig = extAuth break } if match == "" || extAuth.Regex == nil { continue } matches := extAuth.Regex.MatchString(match) if !matches { continue } externalAuthConfig = extAuth } if externalAuthConfig == nil { detail := "External auth provider not found." if len(api.ExternalAuthConfigs) > 0 { regexURLs := make([]string, 0, len(api.ExternalAuthConfigs)) for _, extAuth := range api.ExternalAuthConfigs { if extAuth.Regex == nil { continue } regexURLs = append(regexURLs, fmt.Sprintf("%s=%q", extAuth.ID, extAuth.Regex.String())) } detail = fmt.Sprintf("The configured external auth provider have regex filters that do not match the url. Provider url regex: %s", strings.Join(regexURLs, ",")) } httpapi.Write(ctx, rw, http.StatusNotFound, codersdk.Response{ Message: fmt.Sprintf("No matching external auth provider found in Coder for the url %q.", match), Detail: detail, }) return } workspaceAgent := httpmw.WorkspaceAgent(r) // We must get the workspace to get the owner ID! resource, err := api.Database.GetWorkspaceResourceByID(ctx, workspaceAgent.ResourceID) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to get workspace resource.", Detail: err.Error(), }) return } build, err := api.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to get build.", Detail: err.Error(), }) return } workspace, err := api.Database.GetWorkspaceByID(ctx, build.WorkspaceID) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to get workspace.", Detail: err.Error(), }) return } // Pre-check if the caller can read the external auth links for the owner of the // workspace. Do this up front because a sql.ErrNoRows is expected if the user is // in the flow of authenticating. If no row is present, the auth check is delayed // until the user authenticates. It is preferred to reject early. if !api.Authorize(r, policy.ActionReadPersonal, rbac.ResourceUserObject(workspace.OwnerID)) { httpapi.Forbidden(rw) return } // MarkStale will trigger a refresh by coderd/gitsync. This allows us to // persist git refs as soon as the agent requests external auth so branch // context is retained even if the flow requires an out-of-band login. if gitRef.Branch != "" && gitRef.RemoteOrigin != "" { //nolint:gocritic // Chat processor context required for cross-user chat lookup api.gitSyncWorker.MarkStale(dbauthz.AsChatd(ctx), gitsync.MarkStaleParams{ WorkspaceID: workspace.ID, Branch: gitRef.Branch, Origin: gitRef.RemoteOrigin, ChatID: gitRef.ChatID, }) } var previousToken *database.ExternalAuthLink // handleRetrying will attempt to continually check for a new token // if listen is true. This is useful if an error is encountered in the // original single flow. // // By default, if no errors are encountered, then the single flow response // is returned. handleRetrying := func(code int, response any) { if !listen { httpapi.Write(ctx, rw, code, response) return } api.workspaceAgentsExternalAuthListen(ctx, rw, previousToken, externalAuthConfig, workspace, gitRef) } // This is the URL that will redirect the user with a state token. redirectURL, err := api.AccessURL.Parse(fmt.Sprintf("/external-auth/%s", externalAuthConfig.ID)) if err != nil { handleRetrying(http.StatusInternalServerError, codersdk.Response{ Message: "Failed to parse access URL.", Detail: err.Error(), }) return } externalAuthLink, err := api.Database.GetExternalAuthLink(ctx, database.GetExternalAuthLinkParams{ ProviderID: externalAuthConfig.ID, UserID: workspace.OwnerID, }) if err != nil { if !errors.Is(err, sql.ErrNoRows) { handleRetrying(http.StatusInternalServerError, codersdk.Response{ Message: "Failed to get external auth link.", Detail: err.Error(), }) return } handleRetrying(http.StatusOK, agentsdk.ExternalAuthResponse{ URL: redirectURL.String(), }) return } refreshedLink, err := externalAuthConfig.RefreshToken(ctx, api.Database, externalAuthLink) if err != nil && !externalauth.IsInvalidTokenError(err) { handleRetrying(http.StatusInternalServerError, codersdk.Response{ Message: "Failed to refresh external auth token.", Detail: err.Error(), }) return } if err != nil { // Set the previous token so the retry logic will skip validating the // same token again. This should only be set if the token is invalid and there // was no error. If it is invalid because of an error, then we should recheck. previousToken = &refreshedLink handleRetrying(http.StatusOK, agentsdk.ExternalAuthResponse{ URL: redirectURL.String(), }) return } resp, err := createExternalAuthResponse(externalAuthConfig.Type, refreshedLink.OAuthAccessToken, refreshedLink.OAuthExtra) if err != nil { handleRetrying(http.StatusInternalServerError, codersdk.Response{ Message: "Failed to create external auth response.", Detail: err.Error(), }) return } httpapi.Write(ctx, rw, http.StatusOK, resp) } func (api *API) workspaceAgentsExternalAuthListen(ctx context.Context, rw http.ResponseWriter, previous *database.ExternalAuthLink, externalAuthConfig *externalauth.Config, workspace database.Workspace, gitRef chatGitRef) { // Since we're ticking frequently and this sign-in operation is rare, // we are OK with polling to avoid the complexity of pubsub. ticker, done := api.NewTicker(time.Second) defer done() // If we have a previous token that is invalid, we should not check this again. // This serves to prevent doing excessive unauthorized requests to the external // auth provider. For github, this limit is 60 per hour, so saving a call // per invalid token can be significant. var previousToken database.ExternalAuthLink if previous != nil { previousToken = *previous } for { select { case <-ctx.Done(): return case <-ticker: } externalAuthLink, err := api.Database.GetExternalAuthLink(ctx, database.GetExternalAuthLinkParams{ ProviderID: externalAuthConfig.ID, UserID: workspace.OwnerID, }) if err != nil { if errors.Is(err, sql.ErrNoRows) { continue } httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to get external auth link.", Detail: err.Error(), }) return } // Expiry may be unset if the application doesn't configure tokens // to expire. // See // https://docs.github.com/en/apps/creating-github-apps/authenticating-with-a-github-app/generating-a-user-access-token-for-a-github-app. if externalAuthLink.OAuthExpiry.Before(dbtime.Now()) && !externalAuthLink.OAuthExpiry.IsZero() { continue } // Only attempt to revalidate an oauth token if it has actually changed. // No point in trying to validate the same token over and over again. if previousToken.OAuthAccessToken == externalAuthLink.OAuthAccessToken && previousToken.OAuthRefreshToken == externalAuthLink.OAuthRefreshToken && previousToken.OAuthExpiry.Equal(externalAuthLink.OAuthExpiry) { continue } valid, _, err := externalAuthConfig.ValidateToken(ctx, externalAuthLink.OAuthToken()) if err != nil { api.Logger.Warn(ctx, "failed to validate external auth token", slog.F("workspace_owner_id", workspace.OwnerID.String()), slog.F("validate_url", externalAuthConfig.ValidateURL), slog.Error(err), ) } previousToken = externalAuthLink if !valid { continue } resp, err := createExternalAuthResponse(externalAuthConfig.Type, externalAuthLink.OAuthAccessToken, externalAuthLink.OAuthExtra) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to create external auth response.", Detail: err.Error(), }) return } // MarkStale will trigger a refresh by coderd/gitsync. //nolint:gocritic // Chat processor context required for cross-user chat lookup api.gitSyncWorker.MarkStale(dbauthz.AsChatd(ctx), gitsync.MarkStaleParams{ WorkspaceID: workspace.ID, Branch: gitRef.Branch, Origin: gitRef.RemoteOrigin, ChatID: gitRef.ChatID, }) httpapi.Write(ctx, rw, http.StatusOK, resp) return } } // @Summary User-scoped tailnet RPC connection // @ID user-scoped-tailnet-rpc-connection // @Security CoderSessionToken // @Tags Agents // @Success 101 // @Router /api/v2/tailnet [get] func (api *API) tailnetRPCConn(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() // This is used by Enterprise code to control the functionality of this route. // Namely, disabling the route using `CODER_BROWSER_ONLY`. override := api.WorkspaceClientCoordinateOverride.Load() if override != nil { overrideFunc := *override if overrideFunc != nil && overrideFunc(rw) { return } } version := "2.0" qv := r.URL.Query().Get("version") if qv != "" { version = qv } if err := proto.CurrentVersion.Validate(version); err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Unknown or unsupported API version", Validations: []codersdk.ValidationError{ {Field: "version", Detail: err.Error()}, }, }) return } peerID, err := api.handleResumeToken(ctx, rw, r) if err != nil { // handleResumeToken has already written the response. return } // Used to authorize tunnel request sshPrep, err := api.HTTPAuth.AuthorizeSQLFilter(r, policy.ActionSSH, rbac.ResourceWorkspace.Type) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error preparing sql filter.", Detail: err.Error(), }) return } api.WebsocketWaitMutex.Lock() api.WebsocketWaitGroup.Add(1) api.WebsocketWaitMutex.Unlock() defer api.WebsocketWaitGroup.Done() conn, err := websocket.Accept(rw, r, nil) if err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Failed to accept websocket.", Detail: err.Error(), }) return } ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary) defer wsNetConn.Close() defer conn.Close(websocket.StatusNormalClosure, "") // Get user ID for telemetry apiKey := httpmw.APIKey(r) userID := apiKey.UserID.String() // Store connection telemetry event now := dbtime.Now() connectionTelemetryEvent := telemetry.UserTailnetConnection{ ConnectedAt: now, DisconnectedAt: nil, UserID: userID, PeerID: peerID.String(), DeviceID: nil, DeviceOS: nil, CoderDesktopVersion: nil, } fillCoderDesktopTelemetry(r, &connectionTelemetryEvent, api.Logger) api.Telemetry.Report(&telemetry.Snapshot{ UserTailnetConnections: []telemetry.UserTailnetConnection{connectionTelemetryEvent}, }) defer func() { // Update telemetry event with disconnection time disconnectTime := dbtime.Now() connectionTelemetryEvent.DisconnectedAt = &disconnectTime api.Telemetry.Report(&telemetry.Snapshot{ UserTailnetConnections: []telemetry.UserTailnetConnection{connectionTelemetryEvent}, }) }() ctx, cancel := context.WithCancel(ctx) defer cancel() go httpapi.HeartbeatClose(ctx, api.Logger, cancel, conn) err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, tailnet.StreamID{ Name: "client", ID: peerID, Auth: tailnet.ClientUserCoordinateeAuth{ Auth: &rbacAuthorizer{ sshPrep: sshPrep, db: api.Database, }, }, }) if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) { _ = conn.Close(websocket.StatusInternalError, err.Error()) return } } // fillCoderDesktopTelemetry fills out the provided event based on a Coder Desktop telemetry header on the request, if // present. func fillCoderDesktopTelemetry(r *http.Request, event *telemetry.UserTailnetConnection, logger slog.Logger) { // Parse desktop telemetry from header if it exists desktopTelemetryHeader := r.Header.Get(codersdk.CoderDesktopTelemetryHeader) if desktopTelemetryHeader != "" { var telemetryData codersdk.CoderDesktopTelemetry if err := telemetryData.FromHeader(desktopTelemetryHeader); err == nil { // Only set fields if they aren't empty if telemetryData.DeviceID != "" { event.DeviceID = &telemetryData.DeviceID } if telemetryData.DeviceOS != "" { event.DeviceOS = &telemetryData.DeviceOS } if telemetryData.CoderDesktopVersion != "" { event.CoderDesktopVersion = &telemetryData.CoderDesktopVersion } logger.Debug(r.Context(), "received desktop telemetry", slog.F("device_id", telemetryData.DeviceID), slog.F("device_os", telemetryData.DeviceOS), slog.F("desktop_version", telemetryData.CoderDesktopVersion)) } else { logger.Warn(r.Context(), "failed to parse desktop telemetry header", slog.Error(err)) } } } // createExternalAuthResponse creates an ExternalAuthResponse based on the // provider type. This is to support legacy `/workspaceagents/me/gitauth` // which uses `Username` and `Password`. func createExternalAuthResponse(typ, token string, extra pqtype.NullRawMessage) (agentsdk.ExternalAuthResponse, error) { var resp agentsdk.ExternalAuthResponse switch typ { case string(codersdk.EnhancedExternalAuthProviderGitLab): // https://stackoverflow.com/questions/25409700/using-gitlab-token-to-clone-without-authentication resp = agentsdk.ExternalAuthResponse{ Username: "oauth2", Password: token, } case string(codersdk.EnhancedExternalAuthProviderBitBucketCloud), string(codersdk.EnhancedExternalAuthProviderBitBucketServer): // The string "bitbucket" was a legacy parameter that needs to still be supported. // https://support.atlassian.com/bitbucket-cloud/docs/use-oauth-on-bitbucket-cloud/#Cloning-a-repository-with-an-access-token resp = agentsdk.ExternalAuthResponse{ Username: "x-token-auth", Password: token, } default: resp = agentsdk.ExternalAuthResponse{ Username: token, } } resp.AccessToken = token resp.Type = typ var err error if extra.Valid { err = json.Unmarshal(extra.RawMessage, &resp.TokenExtra) } return resp, err } func convertWorkspaceAgentLogs(logs []database.WorkspaceAgentLog) []codersdk.WorkspaceAgentLog { sdk := make([]codersdk.WorkspaceAgentLog, 0, len(logs)) for _, logEntry := range logs { sdk = append(sdk, db2sdk.WorkspaceAgentLog(logEntry)) } return sdk } // maxChatContextParts caps the number of parts per request to // prevent unbounded message payloads. const maxChatContextParts = 100 // maxChatContextFileBytes caps each context-file part to the same // 64KiB budget used when the agent reads instruction files from disk. const maxChatContextFileBytes = 64 * 1024 // maxChatContextRequestBodyBytes caps the JSON request body size for // agent-added context to roughly the same per-part budget used when // reading instruction files from disk. const maxChatContextRequestBodyBytes int64 = maxChatContextParts * maxChatContextFileBytes // sanitizeWorkspaceAgentContextFileContent applies prompt // sanitization, then enforces the 64KiB per-file budget. The // truncated flag is preserved when the caller already capped the // file before sending it. func sanitizeWorkspaceAgentContextFileContent( content string, truncated bool, ) (string, bool) { content = chatd.SanitizePromptText(content) if len(content) > maxChatContextFileBytes { content = content[:maxChatContextFileBytes] truncated = true } return content, truncated } // readChatContextBody reads and validates the request body for chat // context endpoints. It handles MaxBytesReader wrapping, error // responses, and body rewind. If the body is empty or whitespace-only // and allowEmpty is true, it returns false without writing an error. // //nolint:revive // Add and clear endpoints only differ by empty-body handling. func readChatContextBody(ctx context.Context, rw http.ResponseWriter, r *http.Request, dst any, allowEmpty bool) bool { r.Body = http.MaxBytesReader(rw, r.Body, maxChatContextRequestBodyBytes) body, err := io.ReadAll(r.Body) if err != nil { var maxBytesErr *http.MaxBytesError if errors.As(err, &maxBytesErr) { httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{ Message: "Request body too large.", Detail: fmt.Sprintf("Maximum request body size is %d bytes.", maxChatContextRequestBodyBytes), }) return false } httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Failed to read request body.", Detail: err.Error(), }) return false } if allowEmpty && len(bytes.TrimSpace(body)) == 0 { r.Body = http.NoBody return false } r.Body = io.NopCloser(bytes.NewReader(body)) return httpapi.Read(ctx, rw, r, dst) } // @x-apidocgen {"skip": true} func (api *API) workspaceAgentAddChatContext(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() workspaceAgent := httpmw.WorkspaceAgent(r) var req agentsdk.AddChatContextRequest if !readChatContextBody(ctx, rw, r, &req, false) { return } if len(req.Parts) == 0 { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "No context parts provided.", }) return } if len(req.Parts) > maxChatContextParts { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: fmt.Sprintf("Too many context parts (%d). Maximum is %d.", len(req.Parts), maxChatContextParts), }) return } // Filter to only non-empty context-file and skill parts. filtered := chatd.FilterContextParts(req.Parts, false) if len(filtered) == 0 { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "No context-file or skill parts provided.", }) return } req.Parts = filtered responsePartCount := 0 // Use system context for chat operations since the // workspace agent scope does not include chat resources. // We verify agent-to-chat ownership explicitly below. //nolint:gocritic // Agent needs system access to read/write chat resources. sysCtx := dbauthz.AsSystemRestricted(ctx) workspace, err := api.Database.GetWorkspaceByAgentID(sysCtx, workspaceAgent.ID) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to determine workspace from agent token.", Detail: err.Error(), }) return } chat, err := resolveAgentChat(sysCtx, api.Database, workspaceAgent.ID, workspace.OwnerID, req.ChatID) if err != nil { writeAgentChatError(ctx, rw, err) return } // Stamp each persisted part with the agent identity. Context-file // parts also get server-authoritative workspace metadata. directory := workspaceAgent.ExpandedDirectory if directory == "" { directory = workspaceAgent.Directory } for i := range req.Parts { req.Parts[i].ContextFileAgentID = uuid.NullUUID{ UUID: workspaceAgent.ID, Valid: true, } if req.Parts[i].Type != codersdk.ChatMessagePartTypeContextFile { continue } req.Parts[i].ContextFileContent, req.Parts[i].ContextFileTruncated = sanitizeWorkspaceAgentContextFileContent( req.Parts[i].ContextFileContent, req.Parts[i].ContextFileTruncated, ) req.Parts[i].ContextFileOS = workspaceAgent.OperatingSystem req.Parts[i].ContextFileDirectory = directory } req.Parts = chatd.FilterContextParts(req.Parts, false) if len(req.Parts) == 0 { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "No context-file or skill parts provided.", }) return } responsePartCount = len(req.Parts) // Skill-only messages need a sentinel context-file part so the turn // pipeline trusts the associated skill metadata. req.Parts = prependAgentChatContextSentinelIfNeeded( req.Parts, workspaceAgent.ID, workspaceAgent.OperatingSystem, directory, ) content, err := chatprompt.MarshalParts(req.Parts) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to marshal context parts.", Detail: err.Error(), }) return } err = api.Database.InTx(func(tx database.Store) error { locked, err := tx.GetChatByIDForUpdate(sysCtx, chat.ID) if err != nil { return xerrors.Errorf("lock chat: %w", err) } if !isActiveAgentChat(locked) { return errChatNotActive } if !locked.AgentID.Valid || locked.AgentID.UUID != workspaceAgent.ID { return errChatDoesNotBelongToAgent } if locked.OwnerID != workspace.OwnerID { return errChatDoesNotBelongToWorkspaceOwner } if _, err := tx.InsertChatMessages(sysCtx, chatd.BuildSingleUserChatMessageInsertParams( chat.ID, "", // Agent-initiated context injection has no caller API key. content, database.ChatMessageVisibilityBoth, locked.LastModelConfigID, chatprompt.CurrentContentVersion, uuid.Nil, )); err != nil { return xerrors.Errorf("insert context message: %w", err) } if err := updateAgentChatLastInjectedContextFromMessages(sysCtx, api.Logger, tx, chat.ID); err != nil { return xerrors.Errorf("rebuild injected context cache: %w", err) } return nil }, nil) if err != nil { if errors.Is(err, errChatNotActive) || errors.Is(err, errChatDoesNotBelongToAgent) || errors.Is(err, errChatDoesNotBelongToWorkspaceOwner) { writeAgentChatError(ctx, rw, err) return } httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to persist context message.", Detail: err.Error(), }) return } httpapi.Write(ctx, rw, http.StatusOK, agentsdk.AddChatContextResponse{ ChatID: chat.ID, Count: responsePartCount, }) } // @x-apidocgen {"skip": true} func (api *API) workspaceAgentClearChatContext(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() workspaceAgent := httpmw.WorkspaceAgent(r) var req agentsdk.ClearChatContextRequest populated := readChatContextBody(ctx, rw, r, &req, true) if !populated && r.Body != http.NoBody { return } // Use system context for chat operations since the // workspace agent scope does not include chat resources. //nolint:gocritic // Agent needs system access to read/write chat resources. sysCtx := dbauthz.AsSystemRestricted(ctx) workspace, err := api.Database.GetWorkspaceByAgentID(sysCtx, workspaceAgent.ID) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to determine workspace from agent token.", Detail: err.Error(), }) return } chat, err := resolveAgentChat(sysCtx, api.Database, workspaceAgent.ID, workspace.OwnerID, req.ChatID) if err != nil { // Zero active chats is not an error for clear. if errors.Is(err, errNoActiveChats) { httpapi.Write(ctx, rw, http.StatusOK, agentsdk.ClearChatContextResponse{}) return } writeAgentChatError(ctx, rw, err) return } err = clearAgentChatContext(sysCtx, api.Database, chat.ID, workspaceAgent.ID, workspace.OwnerID) if err != nil { if errors.Is(err, errChatNotActive) || errors.Is(err, errChatDoesNotBelongToAgent) || errors.Is(err, errChatDoesNotBelongToWorkspaceOwner) { writeAgentChatError(ctx, rw, err) return } httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to clear context from chat.", Detail: err.Error(), }) return } httpapi.Write(ctx, rw, http.StatusOK, agentsdk.ClearChatContextResponse{ ChatID: chat.ID, }) } var ( errNoActiveChats = xerrors.New("no active chats found") errChatNotFound = xerrors.New("chat not found") errChatNotActive = xerrors.New("chat is not active") errChatDoesNotBelongToAgent = xerrors.New("chat does not belong to this agent") errChatDoesNotBelongToWorkspaceOwner = xerrors.New("chat does not belong to this workspace owner") ) type multipleActiveChatsError struct { count int } func (e *multipleActiveChatsError) Error() string { return fmt.Sprintf( "multiple active chats (%d) found for this agent, specify a chat ID", e.count, ) } func resolveDefaultAgentChat(chats []database.Chat) (database.Chat, error) { switch len(chats) { case 0: return database.Chat{}, errNoActiveChats case 1: return chats[0], nil } var rootChat *database.Chat for i := range chats { chat := &chats[i] if chat.ParentChatID.Valid { continue } if rootChat != nil { return database.Chat{}, &multipleActiveChatsError{count: len(chats)} } rootChat = chat } if rootChat != nil { return *rootChat, nil } return database.Chat{}, &multipleActiveChatsError{count: len(chats)} } // resolveAgentChat finds the target chat from either an explicit ID // or auto-detection via the agent's active chats. func resolveAgentChat( ctx context.Context, db database.Store, agentID uuid.UUID, workspaceOwnerID uuid.UUID, explicitChatID uuid.UUID, ) (database.Chat, error) { if explicitChatID == uuid.Nil { chats, err := db.GetActiveChatsByAgentID(ctx, agentID) if err != nil { return database.Chat{}, xerrors.Errorf("list active chats: %w", err) } ownerChats := make([]database.Chat, 0, len(chats)) for _, chat := range chats { if chat.OwnerID != workspaceOwnerID { continue } ownerChats = append(ownerChats, chat) } return resolveDefaultAgentChat(ownerChats) } chat, err := db.GetChatByID(ctx, explicitChatID) if err != nil { if errors.Is(err, sql.ErrNoRows) { return database.Chat{}, errChatNotFound } return database.Chat{}, xerrors.Errorf("get chat by id: %w", err) } if !chat.AgentID.Valid || chat.AgentID.UUID != agentID { return database.Chat{}, errChatDoesNotBelongToAgent } if chat.OwnerID != workspaceOwnerID { return database.Chat{}, errChatDoesNotBelongToWorkspaceOwner } if !isActiveAgentChat(chat) { return database.Chat{}, errChatNotActive } return chat, nil } func isActiveAgentChat(chat database.Chat) bool { if chat.Archived { return false } switch chat.Status { case database.ChatStatusWaiting, database.ChatStatusPending, database.ChatStatusRunning, database.ChatStatusPaused, database.ChatStatusRequiresAction: return true default: return false } } func clearAgentChatContext( ctx context.Context, db database.Store, chatID uuid.UUID, agentID uuid.UUID, workspaceOwnerID uuid.UUID, ) error { return db.InTx(func(tx database.Store) error { locked, err := tx.GetChatByIDForUpdate(ctx, chatID) if err != nil { return xerrors.Errorf("lock chat: %w", err) } if !isActiveAgentChat(locked) { return errChatNotActive } if !locked.AgentID.Valid || locked.AgentID.UUID != agentID { return errChatDoesNotBelongToAgent } if locked.OwnerID != workspaceOwnerID { return errChatDoesNotBelongToWorkspaceOwner } messages, err := tx.GetChatMessagesByChatID(ctx, database.GetChatMessagesByChatIDParams{ ChatID: chatID, AfterID: 0, }) if err != nil { return xerrors.Errorf("get chat messages: %w", err) } hadInjectedContext := locked.LastInjectedContext.Valid var skillOnlyMessageIDs []int64 for _, msg := range messages { if !msg.Content.Valid { continue } hasContextFile := messageHasPartTypes(msg.Content.RawMessage, codersdk.ChatMessagePartTypeContextFile) hasSkill := messageHasPartTypes(msg.Content.RawMessage, codersdk.ChatMessagePartTypeSkill) if hasContextFile || hasSkill { hadInjectedContext = true } if hasSkill && !hasContextFile { skillOnlyMessageIDs = append(skillOnlyMessageIDs, msg.ID) } } if !hadInjectedContext { return nil } if err := tx.SoftDeleteContextFileMessages(ctx, chatID); err != nil { return xerrors.Errorf("soft delete context-file messages: %w", err) } for _, messageID := range skillOnlyMessageIDs { if err := tx.SoftDeleteChatMessageByID(ctx, messageID); err != nil { return xerrors.Errorf("soft delete context message %d: %w", messageID, err) } } // Reset provider-side Responses chaining so the next turn replays // the post-clear history instead of inheriting cleared context. if err := tx.ClearChatMessageProviderResponseIDsByChatID(ctx, chatID); err != nil { return xerrors.Errorf("clear provider response chain: %w", err) } // Clear the injected-context cache inside the transaction so it is // atomic with the soft-deletes. param, err := chatd.BuildLastInjectedContext(nil) if err != nil { return xerrors.Errorf("clear injected context cache: %w", err) } if _, err := tx.UpdateChatLastInjectedContext(ctx, database.UpdateChatLastInjectedContextParams{ ID: chatID, LastInjectedContext: param, }); err != nil { return xerrors.Errorf("clear injected context cache: %w", err) } return nil }, nil) } // prependAgentChatContextSentinelIfNeeded adds an empty context-file // part when the request only carries skills. The turn pipeline uses // the sentinel's agent metadata to trust the skill parts. func prependAgentChatContextSentinelIfNeeded( parts []codersdk.ChatMessagePart, agentID uuid.UUID, operatingSystem string, directory string, ) []codersdk.ChatMessagePart { hasContextFile := false hasSkill := false for _, part := range parts { switch part.Type { case codersdk.ChatMessagePartTypeContextFile: hasContextFile = true case codersdk.ChatMessagePartTypeSkill: hasSkill = true } if hasContextFile && hasSkill { return parts } } if !hasSkill || hasContextFile { return parts } return append([]codersdk.ChatMessagePart{{ Type: codersdk.ChatMessagePartTypeContextFile, ContextFilePath: chatd.AgentChatContextSentinelPath, ContextFileAgentID: uuid.NullUUID{ UUID: agentID, Valid: true, }, ContextFileOS: operatingSystem, ContextFileDirectory: directory, }}, parts...) } func sortChatMessagesByCreatedAtAndID(messages []database.ChatMessage) { sort.SliceStable(messages, func(i, j int) bool { if messages[i].CreatedAt.Equal(messages[j].CreatedAt) { return messages[i].ID < messages[j].ID } return messages[i].CreatedAt.Before(messages[j].CreatedAt) }) } // updateAgentChatLastInjectedContextFromMessages rebuilds the // injected-context cache from all persisted context-file and skill parts. func updateAgentChatLastInjectedContextFromMessages( ctx context.Context, logger slog.Logger, db database.Store, chatID uuid.UUID, ) error { messages, err := db.GetChatMessagesByChatID(ctx, database.GetChatMessagesByChatIDParams{ ChatID: chatID, AfterID: 0, }) if err != nil { return xerrors.Errorf("load context messages for injected context: %w", err) } sortChatMessagesByCreatedAtAndID(messages) parts, err := chatd.CollectContextPartsFromMessages(ctx, logger, messages, true) if err != nil { return xerrors.Errorf("collect injected context parts: %w", err) } parts = chatd.FilterContextPartsToLatestAgent(parts) param, err := chatd.BuildLastInjectedContext(parts) if err != nil { return xerrors.Errorf("update injected context: %w", err) } if _, err := db.UpdateChatLastInjectedContext(ctx, database.UpdateChatLastInjectedContextParams{ ID: chatID, LastInjectedContext: param, }); err != nil { return xerrors.Errorf("update injected context: %w", err) } return nil } func messageHasPartTypes(raw []byte, types ...codersdk.ChatMessagePartType) bool { var parts []codersdk.ChatMessagePart if err := json.Unmarshal(raw, &parts); err != nil { return false } for _, part := range parts { for _, typ := range types { if part.Type == typ { return true } } } return false } // writeAgentChatError translates resolveAgentChat errors to HTTP // responses. func writeAgentChatError( ctx context.Context, rw http.ResponseWriter, err error, ) { if errors.Is(err, errNoActiveChats) { httpapi.Write(ctx, rw, http.StatusNotFound, codersdk.Response{ Message: "No active chats found for this agent.", }) return } if errors.Is(err, errChatNotFound) { httpapi.Write(ctx, rw, http.StatusNotFound, codersdk.Response{ Message: "Chat not found.", }) return } if errors.Is(err, errChatDoesNotBelongToAgent) { httpapi.Write(ctx, rw, http.StatusForbidden, codersdk.Response{ Message: "Chat does not belong to this agent.", }) return } if errors.Is(err, errChatDoesNotBelongToWorkspaceOwner) { httpapi.Write(ctx, rw, http.StatusForbidden, codersdk.Response{ Message: "Chat does not belong to this workspace owner.", }) return } if errors.Is(err, errChatNotActive) { httpapi.Write(ctx, rw, http.StatusConflict, codersdk.Response{ Message: "Cannot modify context: this chat is no longer active.", }) return } var multipleErr *multipleActiveChatsError if errors.As(err, &multipleErr) { httpapi.Write(ctx, rw, http.StatusConflict, codersdk.Response{ Message: err.Error(), }) return } httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Failed to resolve chat.", Detail: err.Error(), }) }