mirror of
https://github.com/coder/coder.git
synced 2026-06-03 04:58:23 +00:00
393b3874ac
<!-- If you have used AI to produce some or all of this PR, please ensure you have read our [AI Contribution guidelines](https://coder.com/docs/about/contributing/AI_CONTRIBUTING) before submitting. --> part of https://github.com/coder/coder/issues/21335 This moves updating app status (used by Tasks) into the workspace agent API over dRPC. This will allow us to update the status without having to re-authenticate each time, like we would with an HTTP PATCH request. Further PRs in this stack will pipe these requests thru from the CLI MCP server to the agentsock and finally to this dRPC call to coderd.
2261 lines
72 KiB
Go
2261 lines
72 KiB
Go
package coderd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"slices"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/google/uuid"
|
|
"github.com/sqlc-dev/pqtype"
|
|
"golang.org/x/exp/maps"
|
|
"golang.org/x/sync/errgroup"
|
|
"golang.org/x/xerrors"
|
|
"tailscale.com/tailcfg"
|
|
|
|
"cdr.dev/slog/v3"
|
|
"github.com/coder/coder/v2/coderd/agentapi"
|
|
"github.com/coder/coder/v2/coderd/agentapi/metadatabatcher"
|
|
"github.com/coder/coder/v2/coderd/database"
|
|
"github.com/coder/coder/v2/coderd/database/db2sdk"
|
|
"github.com/coder/coder/v2/coderd/database/dbauthz"
|
|
"github.com/coder/coder/v2/coderd/database/dbtime"
|
|
"github.com/coder/coder/v2/coderd/externalauth"
|
|
"github.com/coder/coder/v2/coderd/httpapi"
|
|
"github.com/coder/coder/v2/coderd/httpmw"
|
|
"github.com/coder/coder/v2/coderd/httpmw/loggermw"
|
|
"github.com/coder/coder/v2/coderd/jwtutils"
|
|
"github.com/coder/coder/v2/coderd/prebuilds"
|
|
"github.com/coder/coder/v2/coderd/rbac"
|
|
"github.com/coder/coder/v2/coderd/rbac/policy"
|
|
"github.com/coder/coder/v2/coderd/telemetry"
|
|
maputil "github.com/coder/coder/v2/coderd/util/maps"
|
|
"github.com/coder/coder/v2/coderd/wspubsub"
|
|
"github.com/coder/coder/v2/codersdk"
|
|
"github.com/coder/coder/v2/codersdk/agentsdk"
|
|
"github.com/coder/coder/v2/codersdk/workspacesdk"
|
|
"github.com/coder/coder/v2/codersdk/wsjson"
|
|
"github.com/coder/coder/v2/tailnet"
|
|
"github.com/coder/coder/v2/tailnet/proto"
|
|
"github.com/coder/websocket"
|
|
)
|
|
|
|
// @Summary Get workspace agent by ID
|
|
// @ID get-workspace-agent-by-id
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
|
|
// @Success 200 {object} codersdk.WorkspaceAgent
|
|
// @Router /workspaceagents/{workspaceagent} [get]
|
|
func (api *API) workspaceAgent(rw http.ResponseWriter, r *http.Request) {
|
|
var (
|
|
ctx = r.Context()
|
|
waws = httpmw.WorkspaceAgentAndWorkspaceParam(r)
|
|
dbApps []database.WorkspaceApp
|
|
scripts []database.WorkspaceAgentScript
|
|
logSources []database.WorkspaceAgentLogSource
|
|
)
|
|
|
|
var eg errgroup.Group
|
|
eg.Go(func() (err error) {
|
|
dbApps, err = api.Database.GetWorkspaceAppsByAgentID(ctx, waws.WorkspaceAgent.ID)
|
|
return err
|
|
})
|
|
eg.Go(func() (err error) {
|
|
//nolint:gocritic // TODO: can we make this not require system restricted?
|
|
scripts, err = api.Database.GetWorkspaceAgentScriptsByAgentIDs(dbauthz.AsSystemRestricted(ctx), []uuid.UUID{waws.WorkspaceAgent.ID})
|
|
return err
|
|
})
|
|
eg.Go(func() (err error) {
|
|
//nolint:gocritic // TODO: can we make this not require system restricted?
|
|
logSources, err = api.Database.GetWorkspaceAgentLogSourcesByAgentIDs(dbauthz.AsSystemRestricted(ctx), []uuid.UUID{waws.WorkspaceAgent.ID})
|
|
return err
|
|
})
|
|
err := eg.Wait()
|
|
if httpapi.Is404Error(err) {
|
|
httpapi.ResourceNotFound(rw)
|
|
return
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
appIDs := []uuid.UUID{}
|
|
for _, app := range dbApps {
|
|
appIDs = append(appIDs, app.ID)
|
|
}
|
|
// nolint:gocritic // This is a system restricted operation.
|
|
statuses, err := api.Database.GetWorkspaceAppStatusesByAppIDs(dbauthz.AsSystemRestricted(ctx), appIDs)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace app statuses.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
apiAgent, err := db2sdk.WorkspaceAgent(
|
|
api.DERPMap(), *api.TailnetCoordinator.Load(), waws.WorkspaceAgent, db2sdk.Apps(dbApps, statuses, waws.WorkspaceAgent, waws.OwnerUsername, waws.WorkspaceTable), convertScripts(scripts), convertLogSources(logSources), api.AgentInactiveDisconnectTimeout,
|
|
api.DeploymentValues.AgentFallbackTroubleshootingURL.String(),
|
|
)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error reading workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, apiAgent)
|
|
}
|
|
|
|
const AgentAPIVersionREST = "1.0"
|
|
|
|
// @Summary Patch workspace agent logs
|
|
// @ID patch-workspace-agent-logs
|
|
// @Security CoderSessionToken
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Param request body agentsdk.PatchLogs true "logs"
|
|
// @Success 200 {object} codersdk.Response
|
|
// @Router /workspaceagents/me/logs [patch]
|
|
func (api *API) patchWorkspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
workspaceAgent := httpmw.WorkspaceAgent(r)
|
|
|
|
var req agentsdk.PatchLogs
|
|
if !httpapi.Read(ctx, rw, r, &req) {
|
|
return
|
|
}
|
|
if len(req.Logs) == 0 {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "No logs provided.",
|
|
})
|
|
return
|
|
}
|
|
// This is to support the legacy API where the log source ID was
|
|
// not provided in the request body. We default to the external
|
|
// log source in this case.
|
|
if req.LogSourceID == uuid.Nil {
|
|
// Use the external log source
|
|
externalSources, err := api.Database.InsertWorkspaceAgentLogSources(ctx, database.InsertWorkspaceAgentLogSourcesParams{
|
|
WorkspaceAgentID: workspaceAgent.ID,
|
|
CreatedAt: dbtime.Now(),
|
|
ID: []uuid.UUID{agentsdk.ExternalLogSourceID},
|
|
DisplayName: []string{"External"},
|
|
Icon: []string{"/emojis/1f310.png"},
|
|
})
|
|
if database.IsUniqueViolation(err, database.UniqueWorkspaceAgentLogSourcesPkey) {
|
|
err = nil
|
|
req.LogSourceID = agentsdk.ExternalLogSourceID
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to create external log source.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if len(externalSources) == 1 {
|
|
req.LogSourceID = externalSources[0].ID
|
|
}
|
|
}
|
|
output := make([]string, 0)
|
|
level := make([]database.LogLevel, 0)
|
|
outputLength := 0
|
|
for _, logEntry := range req.Logs {
|
|
output = append(output, logEntry.Output)
|
|
outputLength += len(logEntry.Output)
|
|
if logEntry.Level == "" {
|
|
// Default to "info" to support older agents that didn't have the level field.
|
|
logEntry.Level = codersdk.LogLevelInfo
|
|
}
|
|
parsedLevel := database.LogLevel(logEntry.Level)
|
|
if !parsedLevel.Valid() {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Invalid log level provided.",
|
|
Detail: fmt.Sprintf("invalid log level: %q", logEntry.Level),
|
|
})
|
|
return
|
|
}
|
|
level = append(level, parsedLevel)
|
|
}
|
|
|
|
logs, err := api.Database.InsertWorkspaceAgentLogs(ctx, database.InsertWorkspaceAgentLogsParams{
|
|
AgentID: workspaceAgent.ID,
|
|
CreatedAt: dbtime.Now(),
|
|
Output: output,
|
|
Level: level,
|
|
LogSourceID: req.LogSourceID,
|
|
// #nosec G115 - Log output length is limited and fits in int32
|
|
OutputLength: int32(outputLength),
|
|
})
|
|
if err != nil {
|
|
if !database.IsWorkspaceAgentLogsLimitError(err) {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to upload logs",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if workspaceAgent.LogsOverflowed {
|
|
httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{
|
|
Message: "Logs limit exceeded",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
err := api.Database.UpdateWorkspaceAgentLogOverflowByID(ctx, database.UpdateWorkspaceAgentLogOverflowByIDParams{
|
|
ID: workspaceAgent.ID,
|
|
LogsOverflowed: true,
|
|
})
|
|
if err != nil {
|
|
// We don't want to return here, because the agent will retry
|
|
// on failure and this isn't a huge deal. The overflow state
|
|
// is just a hint to the user that the logs are incomplete.
|
|
api.Logger.Warn(ctx, "failed to update workspace agent log overflow", slog.Error(err))
|
|
}
|
|
|
|
workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Failed to get workspace.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{
|
|
Kind: wspubsub.WorkspaceEventKindAgentLogsOverflow,
|
|
WorkspaceID: workspace.ID,
|
|
AgentID: &workspaceAgent.ID,
|
|
})
|
|
|
|
httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{
|
|
Message: "Logs limit exceeded",
|
|
})
|
|
return
|
|
}
|
|
|
|
lowestLogID := logs[0].ID
|
|
|
|
// Publish by the lowest log ID inserted so the
|
|
// log stream will fetch everything from that point.
|
|
api.publishWorkspaceAgentLogsUpdate(ctx, workspaceAgent.ID, agentsdk.LogsNotifyMessage{
|
|
CreatedAfter: lowestLogID - 1,
|
|
})
|
|
|
|
if workspaceAgent.LogsLength == 0 {
|
|
// If these are the first logs being appended, we publish a UI update
|
|
// to notify the UI that logs are now available.
|
|
workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Failed to get workspace.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{
|
|
Kind: wspubsub.WorkspaceEventKindAgentFirstLogs,
|
|
WorkspaceID: workspace.ID,
|
|
AgentID: &workspaceAgent.ID,
|
|
})
|
|
}
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, nil)
|
|
}
|
|
|
|
// @Summary Patch workspace agent app status
|
|
// @ID patch-workspace-agent-app-status
|
|
// @Security CoderSessionToken
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Param request body agentsdk.PatchAppStatus true "app status"
|
|
// @Success 200 {object} codersdk.Response
|
|
// @Router /workspaceagents/me/app-status [patch]
|
|
// @Deprecated Use UpdateAppStatus on the Agent API instead.
|
|
func (api *API) patchWorkspaceAgentAppStatus(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
workspaceAgent := httpmw.WorkspaceAgent(r)
|
|
|
|
var req agentsdk.PatchAppStatus
|
|
if !httpapi.Read(ctx, rw, r, &req) {
|
|
return
|
|
}
|
|
|
|
workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Failed to get workspace.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// This functionality has been moved to the AppsAPI in the agentapi. We keep this HTTP handler around for back
|
|
// compatibility with older agents. We'll translate the request into the protobuf so there is only one primary
|
|
// implementation.
|
|
appAPI := &agentapi.AppsAPI{
|
|
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
|
|
return workspaceAgent, nil
|
|
},
|
|
Database: api.Database,
|
|
Log: api.Logger,
|
|
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
|
|
api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{
|
|
Kind: kind,
|
|
WorkspaceID: workspace.ID,
|
|
AgentID: &agent.ID,
|
|
})
|
|
return nil
|
|
},
|
|
NotificationsEnqueuer: api.NotificationsEnqueuer,
|
|
Clock: api.Clock,
|
|
}
|
|
protoReq, err := agentsdk.ProtoFromPatchAppStatus(req)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Failed to parse request.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
_, err = appAPI.UpdateAppStatus(r.Context(), protoReq)
|
|
if err != nil {
|
|
sdkErr := new(codersdk.Error)
|
|
if xerrors.As(err, &sdkErr) {
|
|
httpapi.Write(ctx, rw, sdkErr.StatusCode(), sdkErr.Response)
|
|
return
|
|
}
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to update app status.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
httpapi.Write(ctx, rw, http.StatusOK, nil)
|
|
}
|
|
|
|
// workspaceAgentLogs returns the logs associated with a workspace agent
|
|
//
|
|
// @Summary Get logs by workspace agent
|
|
// @ID get-logs-by-workspace-agent
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
|
|
// @Param before query int false "Before log id"
|
|
// @Param after query int false "After log id"
|
|
// @Param follow query bool false "Follow log stream"
|
|
// @Param no_compression query bool false "Disable compression for WebSocket connection"
|
|
// @Param format query string false "Log output format. Accepted: 'json' (default), 'text' (plain text with RFC3339 timestamps and ANSI colors). Not supported with follow=true." Enums(json,text)
|
|
// @Success 200 {array} codersdk.WorkspaceAgentLog
|
|
// @Router /workspaceagents/{workspaceagent}/logs [get]
|
|
func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
|
|
// This mostly copies how provisioner job logs are streamed!
|
|
var (
|
|
ctx = r.Context()
|
|
waws = httpmw.WorkspaceAgentAndWorkspaceParam(r)
|
|
logger = api.Logger.With(slog.F("workspace_agent_id", waws.WorkspaceAgent.ID))
|
|
follow = r.URL.Query().Has("follow")
|
|
afterRaw = r.URL.Query().Get("after")
|
|
noCompression = r.URL.Query().Has("no_compression")
|
|
format = r.URL.Query().Get("format")
|
|
)
|
|
|
|
// Validate format parameter.
|
|
if format == "" {
|
|
format = "json"
|
|
}
|
|
if format != "json" && format != "text" {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Invalid format parameter.",
|
|
Detail: "Allowed values are \"json\" and \"text\".",
|
|
})
|
|
return
|
|
}
|
|
|
|
// Text format is not supported with streaming.
|
|
if format == "text" && follow {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Text format is not supported with follow mode.",
|
|
Detail: "Use format=json or omit the follow parameter.",
|
|
})
|
|
return
|
|
}
|
|
|
|
var after int64
|
|
// Only fetch logs created after the time provided.
|
|
if afterRaw != "" {
|
|
var err error
|
|
after, err = strconv.ParseInt(afterRaw, 10, 64)
|
|
if err != nil || after < 0 {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Query param \"after\" must be an integer greater than or equal to zero.",
|
|
Validations: []codersdk.ValidationError{
|
|
{Field: "after", Detail: "Must be an integer greater than or equal to zero"},
|
|
},
|
|
})
|
|
return
|
|
}
|
|
}
|
|
|
|
logs, err := api.Database.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
|
|
AgentID: waws.WorkspaceAgent.ID,
|
|
CreatedAfter: after,
|
|
})
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching provisioner logs.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if logs == nil {
|
|
logs = []database.WorkspaceAgentLog{}
|
|
}
|
|
|
|
if !follow {
|
|
if format == "text" {
|
|
sids, err := api.Database.GetWorkspaceAgentLogSourcesByAgentIDs(ctx, []uuid.UUID{waws.WorkspaceAgent.ID})
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace agent log sources.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
lsids := make(map[uuid.UUID]string, len(sids))
|
|
for _, sid := range sids {
|
|
lsids[sid.ID] = sid.DisplayName
|
|
}
|
|
rw.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
|
rw.WriteHeader(http.StatusOK)
|
|
for _, log := range logs {
|
|
_, _ = rw.Write([]byte(db2sdk.WorkspaceAgentLog(log).Text(waws.WorkspaceAgent.Name, lsids[log.LogSourceID])))
|
|
_, _ = rw.Write([]byte("\n"))
|
|
}
|
|
return
|
|
}
|
|
httpapi.Write(ctx, rw, http.StatusOK, convertWorkspaceAgentLogs(logs))
|
|
return
|
|
}
|
|
|
|
api.WebsocketWaitMutex.Lock()
|
|
api.WebsocketWaitGroup.Add(1)
|
|
api.WebsocketWaitMutex.Unlock()
|
|
defer api.WebsocketWaitGroup.Done()
|
|
|
|
opts := &websocket.AcceptOptions{}
|
|
|
|
// Allow client to request no compression. This is useful for buggy
|
|
// clients or if there's a client/server incompatibility. This is
|
|
// needed with e.g. coder/websocket and Safari (confirmed in 16.5).
|
|
//
|
|
// See:
|
|
// * https://github.com/nhooyr/websocket/issues/218
|
|
// * https://github.com/gobwas/ws/issues/169
|
|
if noCompression {
|
|
opts.CompressionMode = websocket.CompressionDisabled
|
|
}
|
|
|
|
conn, err := websocket.Accept(rw, r, opts)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Failed to accept websocket.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
go httpapi.HeartbeatClose(ctx, api.Logger, cancel, conn)
|
|
|
|
encoder := wsjson.NewEncoder[[]codersdk.WorkspaceAgentLog](conn, websocket.MessageText)
|
|
defer encoder.Close(websocket.StatusNormalClosure)
|
|
|
|
err = encoder.Encode(convertWorkspaceAgentLogs(logs))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
lastSentLogID := after
|
|
if len(logs) > 0 {
|
|
lastSentLogID = logs[len(logs)-1].ID
|
|
}
|
|
|
|
workspaceNotifyCh := make(chan struct{}, 1)
|
|
notifyCh := make(chan struct{}, 1)
|
|
// Allow us to immediately check if we missed any logs
|
|
// between initial fetch and subscribe.
|
|
notifyCh <- struct{}{}
|
|
|
|
// Subscribe to workspace to detect new builds.
|
|
closeSubscribeWorkspace, err := api.Pubsub.SubscribeWithErr(wspubsub.WorkspaceEventChannel(waws.WorkspaceTable.OwnerID),
|
|
wspubsub.HandleWorkspaceEvent(
|
|
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
|
|
if err != nil {
|
|
return
|
|
}
|
|
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == waws.WorkspaceTable.ID {
|
|
select {
|
|
case workspaceNotifyCh <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}))
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to subscribe to workspace for log streaming.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
defer closeSubscribeWorkspace()
|
|
// Subscribe early to prevent missing log events.
|
|
closeSubscribe, err := api.Pubsub.Subscribe(agentsdk.LogsNotifyChannel(waws.WorkspaceAgent.ID), func(_ context.Context, _ []byte) {
|
|
// The message is not important, we're tracking lastSentLogID manually.
|
|
select {
|
|
case notifyCh <- struct{}{}:
|
|
default:
|
|
}
|
|
})
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to subscribe to agent for log streaming.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
defer closeSubscribe()
|
|
|
|
// Buffer size controls the log prefetch capacity.
|
|
bufferedLogs := make(chan []database.WorkspaceAgentLog, 8)
|
|
// Check at least once per minute in case we didn't receive a pubsub message.
|
|
recheckInterval := time.Minute
|
|
t := time.NewTicker(recheckInterval)
|
|
defer t.Stop()
|
|
|
|
// Log the request immediately instead of after it completes.
|
|
if rl := loggermw.RequestLoggerFromContext(ctx); rl != nil {
|
|
rl.WriteLog(ctx, http.StatusAccepted)
|
|
}
|
|
|
|
go func() {
|
|
defer func() {
|
|
logger.Debug(ctx, "end log streaming loop")
|
|
close(bufferedLogs)
|
|
}()
|
|
logger.Debug(ctx, "start log streaming loop", slog.F("last_sent_log_id", lastSentLogID))
|
|
|
|
keepGoing := true
|
|
for keepGoing {
|
|
var (
|
|
debugTriggeredBy string
|
|
onlyCheckLatestBuild bool
|
|
)
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
debugTriggeredBy = "timer"
|
|
case <-workspaceNotifyCh:
|
|
debugTriggeredBy = "workspace"
|
|
onlyCheckLatestBuild = true
|
|
case <-notifyCh:
|
|
debugTriggeredBy = "log"
|
|
t.Reset(recheckInterval)
|
|
}
|
|
|
|
agents, err := api.Database.GetWorkspaceAgentsInLatestBuildByWorkspaceID(ctx, waws.WorkspaceTable.ID)
|
|
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
|
|
if xerrors.Is(err, context.Canceled) {
|
|
return
|
|
}
|
|
logger.Warn(ctx, "failed to get workspace agents in latest build", slog.Error(err))
|
|
continue
|
|
}
|
|
// If the agent is no longer in the latest build, we can stop after
|
|
// checking once.
|
|
keepGoing = slices.ContainsFunc(agents, func(agent database.WorkspaceAgent) bool { return agent.ID == waws.WorkspaceAgent.ID })
|
|
|
|
logger.Debug(
|
|
ctx,
|
|
"checking for new logs",
|
|
slog.F("triggered_by", debugTriggeredBy),
|
|
slog.F("only_check_latest_build", onlyCheckLatestBuild),
|
|
slog.F("keep_going", keepGoing),
|
|
slog.F("last_sent_log_id", lastSentLogID),
|
|
slog.F("workspace_has_agents", len(agents) > 0),
|
|
)
|
|
|
|
if onlyCheckLatestBuild && keepGoing {
|
|
continue
|
|
}
|
|
|
|
logs, err := api.Database.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
|
|
AgentID: waws.WorkspaceAgent.ID,
|
|
CreatedAfter: lastSentLogID,
|
|
})
|
|
if err != nil {
|
|
if xerrors.Is(err, context.Canceled) {
|
|
return
|
|
}
|
|
logger.Warn(ctx, "failed to get workspace agent logs after", slog.Error(err))
|
|
continue
|
|
}
|
|
if len(logs) == 0 {
|
|
// Just keep listening - more logs might come in the future!
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case bufferedLogs <- logs:
|
|
lastSentLogID = logs[len(logs)-1].ID
|
|
}
|
|
}
|
|
}()
|
|
defer func() {
|
|
// Ensure that we don't return until the goroutine has exited.
|
|
//nolint:revive // Consume channel to wait until it's closed.
|
|
for range bufferedLogs {
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Debug(ctx, "job logs context canceled")
|
|
return
|
|
case logs, ok := <-bufferedLogs:
|
|
if !ok {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Debug(ctx, "job logs context canceled")
|
|
default:
|
|
logger.Debug(ctx, "reached the end of published logs")
|
|
}
|
|
return
|
|
}
|
|
err = encoder.Encode(convertWorkspaceAgentLogs(logs))
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// @Summary Get listening ports for workspace agent
|
|
// @ID get-listening-ports-for-workspace-agent
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
|
|
// @Success 200 {object} codersdk.WorkspaceAgentListeningPortsResponse
|
|
// @Router /workspaceagents/{workspaceagent}/listening-ports [get]
|
|
func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
waws := httpmw.WorkspaceAgentAndWorkspaceParam(r)
|
|
|
|
// If the agent is unreachable, the request will hang. Assume that if we
|
|
// don't get a response after 30s that the agent is unreachable.
|
|
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
apiAgent, err := db2sdk.WorkspaceAgent(
|
|
api.DERPMap(), *api.TailnetCoordinator.Load(), waws.WorkspaceAgent, nil, nil, nil, api.AgentInactiveDisconnectTimeout,
|
|
api.DeploymentValues.AgentFallbackTroubleshootingURL.String(),
|
|
)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error reading workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if apiAgent.Status != codersdk.WorkspaceAgentConnected {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected),
|
|
})
|
|
return
|
|
}
|
|
|
|
agentConn, release, err := api.agentProvider.AgentConn(ctx, waws.WorkspaceAgent.ID)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error dialing workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
defer release()
|
|
|
|
portsResponse, err := agentConn.ListeningPorts(ctx)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching listening ports.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// Get a list of ports that are in-use by applications.
|
|
apps, err := api.Database.GetWorkspaceAppsByAgentID(ctx, waws.WorkspaceAgent.ID)
|
|
if xerrors.Is(err, sql.ErrNoRows) {
|
|
apps = []database.WorkspaceApp{}
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace apps.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
appPorts := make(map[uint16]struct{}, len(apps))
|
|
for _, app := range apps {
|
|
if !app.Url.Valid || app.Url.String == "" {
|
|
continue
|
|
}
|
|
u, err := url.Parse(app.Url.String)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
port := u.Port()
|
|
if port == "" {
|
|
continue
|
|
}
|
|
portNum, err := strconv.ParseUint(port, 10, 16)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if portNum < 1 || portNum > 65535 {
|
|
continue
|
|
}
|
|
appPorts[uint16(portNum)] = struct{}{}
|
|
}
|
|
|
|
// Filter out ports that are globally blocked, in-use by applications, or
|
|
// common non-HTTP ports such as databases, FTP, SSH, etc.
|
|
filteredPorts := make([]codersdk.WorkspaceAgentListeningPort, 0, len(portsResponse.Ports))
|
|
for _, port := range portsResponse.Ports {
|
|
if port.Port < workspacesdk.AgentMinimumListeningPort {
|
|
continue
|
|
}
|
|
if _, ok := appPorts[port.Port]; ok {
|
|
continue
|
|
}
|
|
if _, ok := workspacesdk.AgentIgnoredListeningPorts[port.Port]; ok {
|
|
continue
|
|
}
|
|
filteredPorts = append(filteredPorts, port)
|
|
}
|
|
|
|
portsResponse.Ports = filteredPorts
|
|
httpapi.Write(ctx, rw, http.StatusOK, portsResponse)
|
|
}
|
|
|
|
// @Summary Watch workspace agent for container updates.
|
|
// @ID watch-workspace-agent-for-container-updates
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
|
|
// @Success 200 {object} codersdk.WorkspaceAgentListContainersResponse
|
|
// @Router /workspaceagents/{workspaceagent}/containers/watch [get]
|
|
func (api *API) watchWorkspaceAgentContainers(rw http.ResponseWriter, r *http.Request) {
|
|
var (
|
|
ctx = r.Context()
|
|
waws = httpmw.WorkspaceAgentAndWorkspaceParam(r)
|
|
logger = api.Logger.Named("agent_container_watcher").With(slog.F("agent_id", waws.WorkspaceAgent.ID))
|
|
)
|
|
|
|
// If the agent is unreachable, the request will hang. Assume that if we
|
|
// don't get a response after 30s that the agent is unreachable.
|
|
dialCtx, dialCancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer dialCancel()
|
|
apiAgent, err := db2sdk.WorkspaceAgent(
|
|
api.DERPMap(),
|
|
*api.TailnetCoordinator.Load(),
|
|
waws.WorkspaceAgent,
|
|
nil,
|
|
nil,
|
|
nil,
|
|
api.AgentInactiveDisconnectTimeout,
|
|
api.DeploymentValues.AgentFallbackTroubleshootingURL.String(),
|
|
)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error reading workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if apiAgent.Status != codersdk.WorkspaceAgentConnected {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected),
|
|
})
|
|
return
|
|
}
|
|
|
|
agentConn, release, err := api.agentProvider.AgentConn(dialCtx, waws.WorkspaceAgent.ID)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error dialing workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
defer release()
|
|
|
|
containersCh, closer, err := agentConn.WatchContainers(ctx, logger)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error watching agent's containers.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
defer closer.Close()
|
|
|
|
conn, err := websocket.Accept(rw, r, nil)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to upgrade connection to websocket.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(r.Context())
|
|
defer cancel()
|
|
|
|
// Here we close the websocket for reading, so that the websocket library will handle pings and
|
|
// close frames.
|
|
_ = conn.CloseRead(context.Background())
|
|
|
|
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageText)
|
|
defer wsNetConn.Close()
|
|
|
|
go httpapi.HeartbeatClose(ctx, logger, cancel, conn)
|
|
|
|
encoder := json.NewEncoder(wsNetConn)
|
|
|
|
for {
|
|
select {
|
|
case <-api.ctx.Done():
|
|
return
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case containers, ok := <-containersCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if err := encoder.Encode(containers); err != nil {
|
|
api.Logger.Error(ctx, "encode containers", slog.Error(err))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// @Summary Get running containers for workspace agent
|
|
// @ID get-running-containers-for-workspace-agent
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
|
|
// @Param label query string true "Labels" format(key=value)
|
|
// @Success 200 {object} codersdk.WorkspaceAgentListContainersResponse
|
|
// @Router /workspaceagents/{workspaceagent}/containers [get]
|
|
func (api *API) workspaceAgentListContainers(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
waws := httpmw.WorkspaceAgentAndWorkspaceParam(r)
|
|
|
|
labelParam, ok := r.URL.Query()["label"]
|
|
if !ok {
|
|
labelParam = []string{}
|
|
}
|
|
labels := make(map[string]string, len(labelParam)/2)
|
|
for _, label := range labelParam {
|
|
kvs := strings.Split(label, "=")
|
|
if len(kvs) != 2 {
|
|
httpapi.Write(r.Context(), rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Invalid label format",
|
|
Detail: "Labels must be in the format key=value",
|
|
})
|
|
return
|
|
}
|
|
labels[kvs[0]] = kvs[1]
|
|
}
|
|
|
|
// If the agent is unreachable, the request will hang. Assume that if we
|
|
// don't get a response after 30s that the agent is unreachable.
|
|
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
apiAgent, err := db2sdk.WorkspaceAgent(
|
|
api.DERPMap(),
|
|
*api.TailnetCoordinator.Load(),
|
|
waws.WorkspaceAgent,
|
|
nil,
|
|
nil,
|
|
nil,
|
|
api.AgentInactiveDisconnectTimeout,
|
|
api.DeploymentValues.AgentFallbackTroubleshootingURL.String(),
|
|
)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error reading workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if apiAgent.Status != codersdk.WorkspaceAgentConnected {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected),
|
|
})
|
|
return
|
|
}
|
|
|
|
agentConn, release, err := api.agentProvider.AgentConn(ctx, waws.WorkspaceAgent.ID)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error dialing workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
defer release()
|
|
|
|
// Get a list of containers that the agent is able to detect
|
|
cts, err := agentConn.ListContainers(ctx)
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
httpapi.Write(ctx, rw, http.StatusRequestTimeout, codersdk.Response{
|
|
Message: "Failed to fetch containers from agent.",
|
|
Detail: "Request timed out.",
|
|
})
|
|
return
|
|
}
|
|
// If the agent returns a codersdk.Error, we can return that directly.
|
|
if cerr, ok := codersdk.AsError(err); ok {
|
|
httpapi.Write(ctx, rw, cerr.StatusCode(), cerr.Response)
|
|
return
|
|
}
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching containers.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// Filter in-place by labels
|
|
cts.Containers = slices.DeleteFunc(cts.Containers, func(ct codersdk.WorkspaceAgentContainer) bool {
|
|
return !maputil.Subset(labels, ct.Labels)
|
|
})
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, cts)
|
|
}
|
|
|
|
// @Summary Delete devcontainer for workspace agent
|
|
// @ID delete-devcontainer-for-workspace-agent
|
|
// @Security CoderSessionToken
|
|
// @Tags Agents
|
|
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
|
|
// @Param devcontainer path string true "Devcontainer ID"
|
|
// @Success 204
|
|
// @Router /workspaceagents/{workspaceagent}/containers/devcontainers/{devcontainer} [delete]
|
|
func (api *API) workspaceAgentDeleteDevcontainer(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
waws := httpmw.WorkspaceAgentAndWorkspaceParam(r)
|
|
|
|
if !api.Authorize(r, policy.ActionUpdate, waws.WorkspaceTable) {
|
|
httpapi.Forbidden(rw)
|
|
return
|
|
}
|
|
|
|
devcontainer := chi.URLParam(r, "devcontainer")
|
|
if devcontainer == "" {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Devcontainer ID is required.",
|
|
Validations: []codersdk.ValidationError{
|
|
{Field: "devcontainer", Detail: "Devcontainer ID is required."},
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
apiAgent, err := db2sdk.WorkspaceAgent(
|
|
api.DERPMap(),
|
|
*api.TailnetCoordinator.Load(),
|
|
waws.WorkspaceAgent,
|
|
nil,
|
|
nil,
|
|
nil,
|
|
api.AgentInactiveDisconnectTimeout,
|
|
api.DeploymentValues.AgentFallbackTroubleshootingURL.String(),
|
|
)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error reading workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if apiAgent.Status != codersdk.WorkspaceAgentConnected {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected),
|
|
})
|
|
return
|
|
}
|
|
|
|
// If the agent is unreachable, the request will hang. Assume that if we
|
|
// don't get a response after 30s that the agent is unreachable.
|
|
dialCtx, dialCancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer dialCancel()
|
|
agentConn, release, err := api.agentProvider.AgentConn(dialCtx, waws.WorkspaceAgent.ID)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error dialing workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
defer release()
|
|
|
|
if err = agentConn.DeleteDevcontainer(ctx, devcontainer); err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
httpapi.Write(ctx, rw, http.StatusRequestTimeout, codersdk.Response{
|
|
Message: "Failed to delete devcontainer from agent.",
|
|
Detail: "Request timed out.",
|
|
})
|
|
return
|
|
}
|
|
// If the agent returns a codersdk.Error, we can return that directly.
|
|
if cerr, ok := codersdk.AsError(err); ok {
|
|
httpapi.Write(ctx, rw, cerr.StatusCode(), cerr.Response)
|
|
return
|
|
}
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error deleting devcontainer.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
httpapi.Write(ctx, rw, http.StatusNoContent, nil)
|
|
}
|
|
|
|
// @Summary Recreate devcontainer for workspace agent
|
|
// @ID recreate-devcontainer-for-workspace-agent
|
|
// @Security CoderSessionToken
|
|
// @Tags Agents
|
|
// @Produce json
|
|
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
|
|
// @Param devcontainer path string true "Devcontainer ID"
|
|
// @Success 202 {object} codersdk.Response
|
|
// @Router /workspaceagents/{workspaceagent}/containers/devcontainers/{devcontainer}/recreate [post]
|
|
func (api *API) workspaceAgentRecreateDevcontainer(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
waws := httpmw.WorkspaceAgentAndWorkspaceParam(r)
|
|
|
|
devcontainer := chi.URLParam(r, "devcontainer")
|
|
if devcontainer == "" {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Devcontainer ID is required.",
|
|
Validations: []codersdk.ValidationError{
|
|
{Field: "devcontainer", Detail: "Devcontainer ID is required."},
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
apiAgent, err := db2sdk.WorkspaceAgent(
|
|
api.DERPMap(),
|
|
*api.TailnetCoordinator.Load(),
|
|
waws.WorkspaceAgent,
|
|
nil,
|
|
nil,
|
|
nil,
|
|
api.AgentInactiveDisconnectTimeout,
|
|
api.DeploymentValues.AgentFallbackTroubleshootingURL.String(),
|
|
)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error reading workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if apiAgent.Status != codersdk.WorkspaceAgentConnected {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected),
|
|
})
|
|
return
|
|
}
|
|
|
|
// If the agent is unreachable, the request will hang. Assume that if we
|
|
// don't get a response after 30s that the agent is unreachable.
|
|
dialCtx, dialCancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer dialCancel()
|
|
agentConn, release, err := api.agentProvider.AgentConn(dialCtx, waws.WorkspaceAgent.ID)
|
|
if err != nil {
|
|
httpapi.Write(dialCtx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error dialing workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
defer release()
|
|
|
|
m, err := agentConn.RecreateDevcontainer(ctx, devcontainer)
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
httpapi.Write(ctx, rw, http.StatusRequestTimeout, codersdk.Response{
|
|
Message: "Failed to recreate devcontainer from agent.",
|
|
Detail: "Request timed out.",
|
|
})
|
|
return
|
|
}
|
|
// If the agent returns a codersdk.Error, we can return that directly.
|
|
if cerr, ok := codersdk.AsError(err); ok {
|
|
httpapi.Write(ctx, rw, cerr.StatusCode(), cerr.Response)
|
|
return
|
|
}
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error recreating devcontainer.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
httpapi.Write(ctx, rw, http.StatusAccepted, m)
|
|
}
|
|
|
|
// @Summary Get connection info for workspace agent
|
|
// @ID get-connection-info-for-workspace-agent
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
|
|
// @Success 200 {object} workspacesdk.AgentConnectionInfo
|
|
// @Router /workspaceagents/{workspaceagent}/connection [get]
|
|
func (api *API) workspaceAgentConnection(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, workspacesdk.AgentConnectionInfo{
|
|
DERPMap: api.DERPMap(),
|
|
DERPForceWebSockets: api.DeploymentValues.DERP.Config.ForceWebSockets.Value(),
|
|
DisableDirectConnections: api.DeploymentValues.DERP.Config.BlockDirect.Value(),
|
|
HostnameSuffix: api.DeploymentValues.WorkspaceHostnameSuffix.Value(),
|
|
})
|
|
}
|
|
|
|
// workspaceAgentConnectionGeneric is the same as workspaceAgentConnection but
|
|
// without the workspaceagent path parameter.
|
|
//
|
|
// @Summary Get connection info for workspace agent generic
|
|
// @ID get-connection-info-for-workspace-agent-generic
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Success 200 {object} workspacesdk.AgentConnectionInfo
|
|
// @Router /workspaceagents/connection [get]
|
|
// @x-apidocgen {"skip": true}
|
|
func (api *API) workspaceAgentConnectionGeneric(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, workspacesdk.AgentConnectionInfo{
|
|
DERPMap: api.DERPMap(),
|
|
DERPForceWebSockets: api.DeploymentValues.DERP.Config.ForceWebSockets.Value(),
|
|
DisableDirectConnections: api.DeploymentValues.DERP.Config.BlockDirect.Value(),
|
|
HostnameSuffix: api.DeploymentValues.WorkspaceHostnameSuffix.Value(),
|
|
})
|
|
}
|
|
|
|
// @Summary Get DERP map updates
|
|
// @ID get-derp-map-updates
|
|
// @Security CoderSessionToken
|
|
// @Tags Agents
|
|
// @Success 101
|
|
// @Router /derp-map [get]
|
|
func (api *API) derpMapUpdates(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
|
|
api.WebsocketWaitMutex.Lock()
|
|
api.WebsocketWaitGroup.Add(1)
|
|
api.WebsocketWaitMutex.Unlock()
|
|
defer api.WebsocketWaitGroup.Done()
|
|
|
|
ws, err := websocket.Accept(rw, r, nil)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Failed to accept websocket.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
encoder := wsjson.NewEncoder[*tailcfg.DERPMap](ws, websocket.MessageBinary)
|
|
defer encoder.Close(websocket.StatusGoingAway)
|
|
|
|
// Log the request immediately instead of after it completes.
|
|
if rl := loggermw.RequestLoggerFromContext(ctx); rl != nil {
|
|
rl.WriteLog(ctx, http.StatusAccepted)
|
|
}
|
|
|
|
go func(ctx context.Context) {
|
|
// TODO(mafredri): Is this too frequent? Use separate ping disconnect timeout?
|
|
t := time.NewTicker(api.AgentConnectionUpdateFrequency)
|
|
defer t.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
err := ws.Ping(ctx)
|
|
cancel()
|
|
if err != nil {
|
|
_ = ws.Close(websocket.StatusGoingAway, "ping failed")
|
|
return
|
|
}
|
|
}
|
|
}(ctx)
|
|
|
|
ticker := time.NewTicker(api.Options.DERPMapUpdateFrequency)
|
|
defer ticker.Stop()
|
|
|
|
var lastDERPMap *tailcfg.DERPMap
|
|
for {
|
|
derpMap := api.DERPMap()
|
|
if lastDERPMap == nil || !tailnet.CompareDERPMaps(lastDERPMap, derpMap) {
|
|
err := encoder.Encode(derpMap)
|
|
if err != nil {
|
|
return
|
|
}
|
|
lastDERPMap = derpMap
|
|
}
|
|
|
|
ticker.Reset(api.Options.DERPMapUpdateFrequency)
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-api.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
// workspaceAgentClientCoordinate accepts a WebSocket that reads node network updates.
|
|
// After accept a PubSub starts listening for new connection node updates
|
|
// which are written to the WebSocket.
|
|
//
|
|
// @Summary Coordinate workspace agent
|
|
// @ID coordinate-workspace-agent
|
|
// @Security CoderSessionToken
|
|
// @Tags Agents
|
|
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
|
|
// @Success 101
|
|
// @Router /workspaceagents/{workspaceagent}/coordinate [get]
|
|
func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
|
|
// Ensure the database is reachable before proceeding.
|
|
_, err := api.Database.Ping(ctx)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: codersdk.DatabaseNotReachable,
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// This route accepts user API key auth and workspace proxy auth. The moon actor has
|
|
// full permissions so should be able to pass this authz check.
|
|
waws := httpmw.WorkspaceAgentAndWorkspaceParam(r)
|
|
if !api.Authorize(r, policy.ActionSSH, waws) {
|
|
httpapi.ResourceNotFound(rw)
|
|
return
|
|
}
|
|
|
|
// This is used by Enterprise code to control the functionality of this route.
|
|
// Namely, disabling the route using `CODER_BROWSER_ONLY`.
|
|
override := api.WorkspaceClientCoordinateOverride.Load()
|
|
if override != nil {
|
|
overrideFunc := *override
|
|
if overrideFunc != nil && overrideFunc(rw) {
|
|
return
|
|
}
|
|
}
|
|
|
|
version := "1.0"
|
|
qv := r.URL.Query().Get("version")
|
|
if qv != "" {
|
|
version = qv
|
|
}
|
|
if err := proto.CurrentVersion.Validate(version); err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Unknown or unsupported API version",
|
|
Validations: []codersdk.ValidationError{
|
|
{Field: "version", Detail: err.Error()},
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
peerID, err := api.handleResumeToken(ctx, rw, r)
|
|
if err != nil {
|
|
// handleResumeToken has already written the response.
|
|
return
|
|
}
|
|
|
|
api.WebsocketWaitMutex.Lock()
|
|
api.WebsocketWaitGroup.Add(1)
|
|
api.WebsocketWaitMutex.Unlock()
|
|
defer api.WebsocketWaitGroup.Done()
|
|
|
|
conn, err := websocket.Accept(rw, r, nil)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Failed to accept websocket.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
|
|
defer wsNetConn.Close()
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
go httpapi.HeartbeatClose(ctx, api.Logger, cancel, conn)
|
|
|
|
defer conn.Close(websocket.StatusNormalClosure, "")
|
|
err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, tailnet.StreamID{
|
|
Name: "client",
|
|
ID: peerID,
|
|
Auth: tailnet.ClientCoordinateeAuth{
|
|
AgentID: waws.WorkspaceAgent.ID,
|
|
},
|
|
})
|
|
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
|
|
_ = conn.Close(websocket.StatusInternalError, err.Error())
|
|
return
|
|
}
|
|
}
|
|
|
|
// handleResumeToken accepts a resume_token query parameter to use the same peer ID
|
|
func (api *API) handleResumeToken(ctx context.Context, rw http.ResponseWriter, r *http.Request) (peerID uuid.UUID, err error) {
|
|
peerID = uuid.New()
|
|
resumeToken := r.URL.Query().Get("resume_token")
|
|
if resumeToken != "" {
|
|
peerID, err = api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(ctx, resumeToken)
|
|
// If the token is missing the key ID, it's probably an old token in which
|
|
// case we just want to generate a new peer ID.
|
|
switch {
|
|
case xerrors.Is(err, jwtutils.ErrMissingKeyID):
|
|
peerID = uuid.New()
|
|
err = nil
|
|
case err != nil:
|
|
httpapi.Write(ctx, rw, http.StatusUnauthorized, codersdk.Response{
|
|
Message: workspacesdk.CoordinateAPIInvalidResumeToken,
|
|
Detail: err.Error(),
|
|
Validations: []codersdk.ValidationError{
|
|
{Field: "resume_token", Detail: workspacesdk.CoordinateAPIInvalidResumeToken},
|
|
},
|
|
})
|
|
return peerID, err
|
|
default:
|
|
api.Logger.Debug(ctx, "accepted coordinate resume token for peer",
|
|
slog.F("peer_id", peerID.String()))
|
|
}
|
|
}
|
|
return peerID, err
|
|
}
|
|
|
|
// @Summary Post workspace agent log source
|
|
// @ID post-workspace-agent-log-source
|
|
// @Security CoderSessionToken
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Param request body agentsdk.PostLogSourceRequest true "Log source request"
|
|
// @Success 200 {object} codersdk.WorkspaceAgentLogSource
|
|
// @Router /workspaceagents/me/log-source [post]
|
|
func (api *API) workspaceAgentPostLogSource(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
var req agentsdk.PostLogSourceRequest
|
|
if !httpapi.Read(ctx, rw, r, &req) {
|
|
return
|
|
}
|
|
|
|
workspaceAgent := httpmw.WorkspaceAgent(r)
|
|
|
|
sources, err := api.Database.InsertWorkspaceAgentLogSources(ctx, database.InsertWorkspaceAgentLogSourcesParams{
|
|
WorkspaceAgentID: workspaceAgent.ID,
|
|
CreatedAt: dbtime.Now(),
|
|
ID: []uuid.UUID{req.ID},
|
|
DisplayName: []string{req.DisplayName},
|
|
Icon: []string{req.Icon},
|
|
})
|
|
if err != nil {
|
|
if database.IsUniqueViolation(err, "workspace_agent_log_sources_pkey") {
|
|
httpapi.Write(ctx, rw, http.StatusCreated, codersdk.WorkspaceAgentLogSource{
|
|
WorkspaceAgentID: workspaceAgent.ID,
|
|
CreatedAt: dbtime.Now(),
|
|
ID: req.ID,
|
|
DisplayName: req.DisplayName,
|
|
Icon: req.Icon,
|
|
})
|
|
return
|
|
}
|
|
httpapi.InternalServerError(rw, err)
|
|
return
|
|
}
|
|
|
|
if len(sources) != 1 {
|
|
httpapi.InternalServerError(rw, xerrors.Errorf("database should've returned 1 row, got %d", len(sources)))
|
|
return
|
|
}
|
|
|
|
apiSource := convertLogSources(sources)[0]
|
|
|
|
httpapi.Write(ctx, rw, http.StatusCreated, apiSource)
|
|
}
|
|
|
|
// @Summary Get workspace agent reinitialization
|
|
// @ID get-workspace-agent-reinitialization
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Success 200 {object} agentsdk.ReinitializationEvent
|
|
// @Router /workspaceagents/me/reinit [get]
|
|
func (api *API) workspaceAgentReinit(rw http.ResponseWriter, r *http.Request) {
|
|
// Allow us to interrupt watch via cancel.
|
|
ctx, cancel := context.WithCancel(r.Context())
|
|
defer cancel()
|
|
r = r.WithContext(ctx) // Rewire context for SSE cancellation.
|
|
|
|
workspaceAgent := httpmw.WorkspaceAgent(r)
|
|
log := api.Logger.Named("workspace_agent_reinit_watcher").With(
|
|
slog.F("workspace_agent_id", workspaceAgent.ID),
|
|
)
|
|
|
|
workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID)
|
|
if err != nil {
|
|
log.Error(ctx, "failed to retrieve workspace from agent token", slog.Error(err))
|
|
httpapi.InternalServerError(rw, xerrors.New("failed to determine workspace from agent token"))
|
|
}
|
|
|
|
log.Info(ctx, "agent waiting for reinit instruction")
|
|
|
|
reinitEvents := make(chan agentsdk.ReinitializationEvent)
|
|
cancel, err = prebuilds.NewPubsubWorkspaceClaimListener(api.Pubsub, log).ListenForWorkspaceClaims(ctx, workspace.ID, reinitEvents)
|
|
if err != nil {
|
|
log.Error(ctx, "subscribe to prebuild claimed channel", slog.Error(err))
|
|
httpapi.InternalServerError(rw, xerrors.New("failed to subscribe to prebuild claimed channel"))
|
|
return
|
|
}
|
|
defer cancel()
|
|
|
|
transmitter := agentsdk.NewSSEAgentReinitTransmitter(log, rw, r)
|
|
|
|
err = transmitter.Transmit(ctx, reinitEvents)
|
|
switch {
|
|
case errors.Is(err, agentsdk.ErrTransmissionSourceClosed):
|
|
log.Info(ctx, "agent reinitialization subscription closed", slog.F("workspace_agent_id", workspaceAgent.ID))
|
|
case errors.Is(err, agentsdk.ErrTransmissionTargetClosed):
|
|
log.Info(ctx, "agent connection closed", slog.F("workspace_agent_id", workspaceAgent.ID))
|
|
case errors.Is(err, context.Canceled):
|
|
log.Info(ctx, "agent reinitialization", slog.Error(err))
|
|
case err != nil:
|
|
log.Error(ctx, "failed to stream agent reinit events", slog.Error(err))
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error streaming agent reinitialization events.",
|
|
Detail: err.Error(),
|
|
})
|
|
}
|
|
}
|
|
|
|
// convertProvisionedApps converts applications that are in the middle of provisioning process.
|
|
// It means that they may not have an agent or workspace assigned (dry-run job).
|
|
func convertProvisionedApps(dbApps []database.WorkspaceApp) []codersdk.WorkspaceApp {
|
|
return db2sdk.Apps(dbApps, []database.WorkspaceAppStatus{}, database.WorkspaceAgent{}, "", database.WorkspaceTable{})
|
|
}
|
|
|
|
func convertLogSources(dbLogSources []database.WorkspaceAgentLogSource) []codersdk.WorkspaceAgentLogSource {
|
|
logSources := make([]codersdk.WorkspaceAgentLogSource, 0)
|
|
for _, dbLogSource := range dbLogSources {
|
|
logSources = append(logSources, codersdk.WorkspaceAgentLogSource{
|
|
ID: dbLogSource.ID,
|
|
DisplayName: dbLogSource.DisplayName,
|
|
WorkspaceAgentID: dbLogSource.WorkspaceAgentID,
|
|
CreatedAt: dbLogSource.CreatedAt,
|
|
Icon: dbLogSource.Icon,
|
|
})
|
|
}
|
|
return logSources
|
|
}
|
|
|
|
func convertScripts(dbScripts []database.WorkspaceAgentScript) []codersdk.WorkspaceAgentScript {
|
|
scripts := make([]codersdk.WorkspaceAgentScript, 0)
|
|
for _, dbScript := range dbScripts {
|
|
scripts = append(scripts, codersdk.WorkspaceAgentScript{
|
|
ID: dbScript.ID,
|
|
LogPath: dbScript.LogPath,
|
|
LogSourceID: dbScript.LogSourceID,
|
|
Script: dbScript.Script,
|
|
Cron: dbScript.Cron,
|
|
RunOnStart: dbScript.RunOnStart,
|
|
RunOnStop: dbScript.RunOnStop,
|
|
StartBlocksLogin: dbScript.StartBlocksLogin,
|
|
Timeout: time.Duration(dbScript.TimeoutSeconds) * time.Second,
|
|
DisplayName: dbScript.DisplayName,
|
|
})
|
|
}
|
|
return scripts
|
|
}
|
|
|
|
// @Summary Watch for workspace agent metadata updates
|
|
// @ID watch-for-workspace-agent-metadata-updates
|
|
// @Security CoderSessionToken
|
|
// @Tags Agents
|
|
// @Success 200 "Success"
|
|
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
|
|
// @Router /workspaceagents/{workspaceagent}/watch-metadata [get]
|
|
// @x-apidocgen {"skip": true}
|
|
// @Deprecated Use /workspaceagents/{workspaceagent}/watch-metadata-ws instead
|
|
func (api *API) watchWorkspaceAgentMetadataSSE(rw http.ResponseWriter, r *http.Request) {
|
|
api.watchWorkspaceAgentMetadata(rw, r, httpapi.ServerSentEventSender)
|
|
}
|
|
|
|
// @Summary Watch for workspace agent metadata updates via WebSockets
|
|
// @ID watch-for-workspace-agent-metadata-updates-via-websockets
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Success 200 {object} codersdk.ServerSentEvent
|
|
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
|
|
// @Router /workspaceagents/{workspaceagent}/watch-metadata-ws [get]
|
|
// @x-apidocgen {"skip": true}
|
|
func (api *API) watchWorkspaceAgentMetadataWS(rw http.ResponseWriter, r *http.Request) {
|
|
api.watchWorkspaceAgentMetadata(rw, r, httpapi.OneWayWebSocketEventSender(api.Logger))
|
|
}
|
|
|
|
func (api *API) watchWorkspaceAgentMetadata(
|
|
rw http.ResponseWriter,
|
|
r *http.Request,
|
|
connect httpapi.EventSender,
|
|
) {
|
|
// Allow us to interrupt watch via cancel.
|
|
ctx, cancel := context.WithCancel(r.Context())
|
|
defer cancel()
|
|
r = r.WithContext(ctx) // Rewire context for SSE cancellation.
|
|
|
|
waws := httpmw.WorkspaceAgentAndWorkspaceParam(r)
|
|
agentIDEncoded := make([]byte, metadatabatcher.UUIDBase64Size)
|
|
err := metadatabatcher.EncodeAgentID(waws.WorkspaceAgent.ID, agentIDEncoded)
|
|
if err != nil {
|
|
httpapi.InternalServerError(rw, err)
|
|
return
|
|
}
|
|
log := api.Logger.Named("workspace_metadata_watcher").With(
|
|
slog.F("workspace_agent_id", waws.WorkspaceAgent.ID),
|
|
slog.F("workspace_id", waws.WorkspaceTable.ID),
|
|
)
|
|
|
|
// Send metadata on updates, we must ensure subscription before sending
|
|
// initial metadata to guarantee that events in-between are not missed.
|
|
// The channel carries no data - it's just a signal to fetch all metadata.
|
|
update := make(chan struct{}, 1)
|
|
|
|
// Subscribe to the global batched metadata channel.
|
|
// The batcher publishes only to this channel to achieve O(1) NOTIFY scaling.
|
|
cancelBatchSub, err := api.Pubsub.Subscribe(metadatabatcher.MetadataBatchPubsubChannel, func(_ context.Context, byt []byte) {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
if len(byt)%metadatabatcher.UUIDBase64Size != 0 {
|
|
log.Error(ctx, "invalid batched pubsub message, pubsub message length was not a multiple of encoded agent UUID length", slog.Error(err))
|
|
return
|
|
}
|
|
|
|
// Compare each encoded agentID to our encoded agent ID.
|
|
for i := 0; i < len(byt); i += metadatabatcher.UUIDBase64Size {
|
|
if !bytes.Equal(byt[i:i+metadatabatcher.UUIDBase64Size], agentIDEncoded) {
|
|
continue
|
|
}
|
|
|
|
log.Debug(ctx, "received metadata update from batch channel",
|
|
slog.F("agent_id", waws.WorkspaceAgent.ID),
|
|
slog.F("batch_size", len(byt)/metadatabatcher.UUIDBase64Size),
|
|
)
|
|
|
|
// Signal to re-fetch all metadata for this agent.
|
|
// Batch notifications don't include which keys changed, so we
|
|
// always fetch all keys for this agent.
|
|
// Attempt to read from the channel first so that we do not block on the write.
|
|
select {
|
|
case <-update:
|
|
default:
|
|
}
|
|
update <- struct{}{}
|
|
break
|
|
}
|
|
})
|
|
if err != nil {
|
|
httpapi.InternalServerError(rw, err)
|
|
return
|
|
}
|
|
defer cancelBatchSub()
|
|
|
|
// We always use the original Request context because it contains
|
|
// the RBAC actor.
|
|
initialMD, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{
|
|
WorkspaceAgentID: waws.WorkspaceAgent.ID,
|
|
Keys: nil,
|
|
})
|
|
if err != nil {
|
|
// If we can't successfully pull the initial metadata, pubsub
|
|
// updates will be no-op so we may as well terminate the
|
|
// connection early.
|
|
httpapi.InternalServerError(rw, err)
|
|
return
|
|
}
|
|
|
|
log.Debug(ctx, "got initial metadata", slog.F("num", len(initialMD)))
|
|
|
|
metadataMap := make(map[string]database.WorkspaceAgentMetadatum, len(initialMD))
|
|
for _, datum := range initialMD {
|
|
metadataMap[datum.Key] = datum
|
|
}
|
|
//nolint:ineffassign // Release memory.
|
|
initialMD = nil
|
|
|
|
sendEvent, senderClosed, err := connect(rw, r)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error setting up server-sent events.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
// Prevent handler from returning until the sender is closed.
|
|
defer func() {
|
|
cancel()
|
|
<-senderClosed
|
|
}()
|
|
// Synchronize cancellation from SSE -> context, this lets us simplify the
|
|
// cancellation logic.
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-senderClosed:
|
|
cancel()
|
|
}
|
|
}()
|
|
|
|
var lastSend time.Time
|
|
sendMetadata := func() {
|
|
lastSend = time.Now()
|
|
values := maps.Values(metadataMap)
|
|
|
|
log.Debug(ctx, "sending metadata", slog.F("num", len(values)))
|
|
|
|
_ = sendEvent(codersdk.ServerSentEvent{
|
|
Type: codersdk.ServerSentEventTypeData,
|
|
Data: convertWorkspaceAgentMetadata(values),
|
|
})
|
|
}
|
|
|
|
// We send updates exactly every second.
|
|
const sendInterval = time.Second * 1
|
|
sendTicker := time.NewTicker(sendInterval)
|
|
defer sendTicker.Stop()
|
|
|
|
// Log the request immediately instead of after it completes.
|
|
if rl := loggermw.RequestLoggerFromContext(ctx); rl != nil {
|
|
rl.WriteLog(ctx, http.StatusAccepted)
|
|
}
|
|
|
|
// Send initial metadata.
|
|
sendMetadata()
|
|
|
|
// Fetch updated metadata keys as they come in.
|
|
fetchedMetadata := make(chan []database.WorkspaceAgentMetadatum)
|
|
go func() {
|
|
defer close(fetchedMetadata)
|
|
defer cancel()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-update:
|
|
// Batch notification received - fetch all metadata for this agent.
|
|
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{
|
|
WorkspaceAgentID: waws.WorkspaceAgent.ID,
|
|
Keys: nil, // nil means fetch all keys
|
|
})
|
|
if err != nil {
|
|
if !database.IsQueryCanceledError(err) {
|
|
log.Error(ctx, "failed to get metadata", slog.Error(err))
|
|
_ = sendEvent(codersdk.ServerSentEvent{
|
|
Type: codersdk.ServerSentEventTypeError,
|
|
Data: codersdk.Response{
|
|
Message: "Failed to get metadata.",
|
|
Detail: err.Error(),
|
|
},
|
|
})
|
|
}
|
|
return
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
// We want to block here to avoid constantly pinging the
|
|
// database when the metadata isn't being processed.
|
|
case fetchedMetadata <- md:
|
|
log.Debug(ctx, "fetched all metadata after batch update", slog.F("num", len(md)))
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
defer func() {
|
|
<-fetchedMetadata
|
|
}()
|
|
|
|
pendingChanges := true
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case md, ok := <-fetchedMetadata:
|
|
if !ok {
|
|
return
|
|
}
|
|
for _, datum := range md {
|
|
metadataMap[datum.Key] = datum
|
|
}
|
|
pendingChanges = true
|
|
continue
|
|
case <-sendTicker.C:
|
|
// We send an update even if there's no change every 5 seconds
|
|
// to ensure that the frontend always has an accurate "Result.Age".
|
|
if !pendingChanges && time.Since(lastSend) < 5*time.Second {
|
|
continue
|
|
}
|
|
pendingChanges = false
|
|
}
|
|
|
|
sendMetadata()
|
|
}
|
|
}
|
|
|
|
func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []codersdk.WorkspaceAgentMetadata {
|
|
// Sort the input database slice by DisplayOrder and then by Key before processing
|
|
sort.Slice(db, func(i, j int) bool {
|
|
if db[i].DisplayOrder == db[j].DisplayOrder {
|
|
return db[i].Key < db[j].Key
|
|
}
|
|
return db[i].DisplayOrder < db[j].DisplayOrder
|
|
})
|
|
|
|
// An empty array is easier for clients to handle than a null.
|
|
result := make([]codersdk.WorkspaceAgentMetadata, len(db))
|
|
for i, datum := range db {
|
|
result[i] = codersdk.WorkspaceAgentMetadata{
|
|
Result: codersdk.WorkspaceAgentMetadataResult{
|
|
Value: datum.Value,
|
|
Error: datum.Error,
|
|
CollectedAt: datum.CollectedAt.UTC(),
|
|
Age: int64(time.Since(datum.CollectedAt).Seconds()),
|
|
},
|
|
Description: codersdk.WorkspaceAgentMetadataDescription{
|
|
DisplayName: datum.DisplayName,
|
|
Key: datum.Key,
|
|
Script: datum.Script,
|
|
Interval: datum.Interval,
|
|
Timeout: datum.Timeout,
|
|
},
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// workspaceAgentsExternalAuth returns an access token for a given URL
|
|
// or finds a provider by ID.
|
|
//
|
|
// @Summary Get workspace agent external auth
|
|
// @ID get-workspace-agent-external-auth
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Agents
|
|
// @Param match query string true "Match"
|
|
// @Param id query string true "Provider ID"
|
|
// @Param listen query bool false "Wait for a new token to be issued"
|
|
// @Success 200 {object} agentsdk.ExternalAuthResponse
|
|
// @Router /workspaceagents/me/external-auth [get]
|
|
func (api *API) workspaceAgentsExternalAuth(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
query := r.URL.Query()
|
|
// Either match or configID must be provided!
|
|
match := query.Get("match")
|
|
if match == "" {
|
|
// Support legacy agents!
|
|
match = query.Get("url")
|
|
}
|
|
id := query.Get("id")
|
|
if match == "" && id == "" {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "'url' or 'id' must be provided!",
|
|
})
|
|
return
|
|
}
|
|
if match != "" && id != "" {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "'url' and 'id' cannot be provided together!",
|
|
})
|
|
return
|
|
}
|
|
|
|
// listen determines if the request will wait for a
|
|
// new token to be issued!
|
|
listen := r.URL.Query().Has("listen")
|
|
|
|
var externalAuthConfig *externalauth.Config
|
|
for _, extAuth := range api.ExternalAuthConfigs {
|
|
if extAuth.ID == id {
|
|
externalAuthConfig = extAuth
|
|
break
|
|
}
|
|
if match == "" || extAuth.Regex == nil {
|
|
continue
|
|
}
|
|
matches := extAuth.Regex.MatchString(match)
|
|
if !matches {
|
|
continue
|
|
}
|
|
externalAuthConfig = extAuth
|
|
}
|
|
if externalAuthConfig == nil {
|
|
detail := "External auth provider not found."
|
|
if len(api.ExternalAuthConfigs) > 0 {
|
|
regexURLs := make([]string, 0, len(api.ExternalAuthConfigs))
|
|
for _, extAuth := range api.ExternalAuthConfigs {
|
|
if extAuth.Regex == nil {
|
|
continue
|
|
}
|
|
regexURLs = append(regexURLs, fmt.Sprintf("%s=%q", extAuth.ID, extAuth.Regex.String()))
|
|
}
|
|
detail = fmt.Sprintf("The configured external auth provider have regex filters that do not match the url. Provider url regex: %s", strings.Join(regexURLs, ","))
|
|
}
|
|
httpapi.Write(ctx, rw, http.StatusNotFound, codersdk.Response{
|
|
Message: fmt.Sprintf("No matching external auth provider found in Coder for the url %q.", match),
|
|
Detail: detail,
|
|
})
|
|
return
|
|
}
|
|
workspaceAgent := httpmw.WorkspaceAgent(r)
|
|
// We must get the workspace to get the owner ID!
|
|
resource, err := api.Database.GetWorkspaceResourceByID(ctx, workspaceAgent.ResourceID)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to get workspace resource.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
build, err := api.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to get build.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
workspace, err := api.Database.GetWorkspaceByID(ctx, build.WorkspaceID)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to get workspace.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// Pre-check if the caller can read the external auth links for the owner of the
|
|
// workspace. Do this up front because a sql.ErrNoRows is expected if the user is
|
|
// in the flow of authenticating. If no row is present, the auth check is delayed
|
|
// until the user authenticates. It is preferred to reject early.
|
|
if !api.Authorize(r, policy.ActionReadPersonal, rbac.ResourceUserObject(workspace.OwnerID)) {
|
|
httpapi.Forbidden(rw)
|
|
return
|
|
}
|
|
|
|
var previousToken *database.ExternalAuthLink
|
|
// handleRetrying will attempt to continually check for a new token
|
|
// if listen is true. This is useful if an error is encountered in the
|
|
// original single flow.
|
|
//
|
|
// By default, if no errors are encountered, then the single flow response
|
|
// is returned.
|
|
handleRetrying := func(code int, response any) {
|
|
if !listen {
|
|
httpapi.Write(ctx, rw, code, response)
|
|
return
|
|
}
|
|
|
|
api.workspaceAgentsExternalAuthListen(ctx, rw, previousToken, externalAuthConfig, workspace)
|
|
}
|
|
|
|
// This is the URL that will redirect the user with a state token.
|
|
redirectURL, err := api.AccessURL.Parse(fmt.Sprintf("/external-auth/%s", externalAuthConfig.ID))
|
|
if err != nil {
|
|
handleRetrying(http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to parse access URL.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
externalAuthLink, err := api.Database.GetExternalAuthLink(ctx, database.GetExternalAuthLinkParams{
|
|
ProviderID: externalAuthConfig.ID,
|
|
UserID: workspace.OwnerID,
|
|
})
|
|
if err != nil {
|
|
if !errors.Is(err, sql.ErrNoRows) {
|
|
handleRetrying(http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to get external auth link.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
handleRetrying(http.StatusOK, agentsdk.ExternalAuthResponse{
|
|
URL: redirectURL.String(),
|
|
})
|
|
return
|
|
}
|
|
|
|
refreshedLink, err := externalAuthConfig.RefreshToken(ctx, api.Database, externalAuthLink)
|
|
if err != nil && !externalauth.IsInvalidTokenError(err) {
|
|
handleRetrying(http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to refresh external auth token.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if err != nil {
|
|
// Set the previous token so the retry logic will skip validating the
|
|
// same token again. This should only be set if the token is invalid and there
|
|
// was no error. If it is invalid because of an error, then we should recheck.
|
|
previousToken = &refreshedLink
|
|
handleRetrying(http.StatusOK, agentsdk.ExternalAuthResponse{
|
|
URL: redirectURL.String(),
|
|
})
|
|
return
|
|
}
|
|
resp, err := createExternalAuthResponse(externalAuthConfig.Type, refreshedLink.OAuthAccessToken, refreshedLink.OAuthExtra)
|
|
if err != nil {
|
|
handleRetrying(http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to create external auth response.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
httpapi.Write(ctx, rw, http.StatusOK, resp)
|
|
}
|
|
|
|
func (api *API) workspaceAgentsExternalAuthListen(ctx context.Context, rw http.ResponseWriter, previous *database.ExternalAuthLink, externalAuthConfig *externalauth.Config, workspace database.Workspace) {
|
|
// Since we're ticking frequently and this sign-in operation is rare,
|
|
// we are OK with polling to avoid the complexity of pubsub.
|
|
ticker, done := api.NewTicker(time.Second)
|
|
defer done()
|
|
// If we have a previous token that is invalid, we should not check this again.
|
|
// This serves to prevent doing excessive unauthorized requests to the external
|
|
// auth provider. For github, this limit is 60 per hour, so saving a call
|
|
// per invalid token can be significant.
|
|
var previousToken database.ExternalAuthLink
|
|
if previous != nil {
|
|
previousToken = *previous
|
|
}
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker:
|
|
}
|
|
externalAuthLink, err := api.Database.GetExternalAuthLink(ctx, database.GetExternalAuthLinkParams{
|
|
ProviderID: externalAuthConfig.ID,
|
|
UserID: workspace.OwnerID,
|
|
})
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
continue
|
|
}
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to get external auth link.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// Expiry may be unset if the application doesn't configure tokens
|
|
// to expire.
|
|
// See
|
|
// https://docs.github.com/en/apps/creating-github-apps/authenticating-with-a-github-app/generating-a-user-access-token-for-a-github-app.
|
|
if externalAuthLink.OAuthExpiry.Before(dbtime.Now()) && !externalAuthLink.OAuthExpiry.IsZero() {
|
|
continue
|
|
}
|
|
|
|
// Only attempt to revalidate an oauth token if it has actually changed.
|
|
// No point in trying to validate the same token over and over again.
|
|
if previousToken.OAuthAccessToken == externalAuthLink.OAuthAccessToken &&
|
|
previousToken.OAuthRefreshToken == externalAuthLink.OAuthRefreshToken &&
|
|
previousToken.OAuthExpiry == externalAuthLink.OAuthExpiry {
|
|
continue
|
|
}
|
|
|
|
valid, _, err := externalAuthConfig.ValidateToken(ctx, externalAuthLink.OAuthToken())
|
|
if err != nil {
|
|
api.Logger.Warn(ctx, "failed to validate external auth token",
|
|
slog.F("workspace_owner_id", workspace.OwnerID.String()),
|
|
slog.F("validate_url", externalAuthConfig.ValidateURL),
|
|
slog.Error(err),
|
|
)
|
|
}
|
|
previousToken = externalAuthLink
|
|
if !valid {
|
|
continue
|
|
}
|
|
resp, err := createExternalAuthResponse(externalAuthConfig.Type, externalAuthLink.OAuthAccessToken, externalAuthLink.OAuthExtra)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Failed to create external auth response.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
httpapi.Write(ctx, rw, http.StatusOK, resp)
|
|
return
|
|
}
|
|
}
|
|
|
|
// @Summary User-scoped tailnet RPC connection
|
|
// @ID user-scoped-tailnet-rpc-connection
|
|
// @Security CoderSessionToken
|
|
// @Tags Agents
|
|
// @Success 101
|
|
// @Router /tailnet [get]
|
|
func (api *API) tailnetRPCConn(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
|
|
// This is used by Enterprise code to control the functionality of this route.
|
|
// Namely, disabling the route using `CODER_BROWSER_ONLY`.
|
|
override := api.WorkspaceClientCoordinateOverride.Load()
|
|
if override != nil {
|
|
overrideFunc := *override
|
|
if overrideFunc != nil && overrideFunc(rw) {
|
|
return
|
|
}
|
|
}
|
|
|
|
version := "2.0"
|
|
qv := r.URL.Query().Get("version")
|
|
if qv != "" {
|
|
version = qv
|
|
}
|
|
if err := proto.CurrentVersion.Validate(version); err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Unknown or unsupported API version",
|
|
Validations: []codersdk.ValidationError{
|
|
{Field: "version", Detail: err.Error()},
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
peerID, err := api.handleResumeToken(ctx, rw, r)
|
|
if err != nil {
|
|
// handleResumeToken has already written the response.
|
|
return
|
|
}
|
|
|
|
// Used to authorize tunnel request
|
|
sshPrep, err := api.HTTPAuth.AuthorizeSQLFilter(r, policy.ActionSSH, rbac.ResourceWorkspace.Type)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error preparing sql filter.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
api.WebsocketWaitMutex.Lock()
|
|
api.WebsocketWaitGroup.Add(1)
|
|
api.WebsocketWaitMutex.Unlock()
|
|
defer api.WebsocketWaitGroup.Done()
|
|
|
|
conn, err := websocket.Accept(rw, r, nil)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Failed to accept websocket.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
|
|
defer wsNetConn.Close()
|
|
defer conn.Close(websocket.StatusNormalClosure, "")
|
|
|
|
// Get user ID for telemetry
|
|
apiKey := httpmw.APIKey(r)
|
|
userID := apiKey.UserID.String()
|
|
|
|
// Store connection telemetry event
|
|
now := time.Now()
|
|
connectionTelemetryEvent := telemetry.UserTailnetConnection{
|
|
ConnectedAt: now,
|
|
DisconnectedAt: nil,
|
|
UserID: userID,
|
|
PeerID: peerID.String(),
|
|
DeviceID: nil,
|
|
DeviceOS: nil,
|
|
CoderDesktopVersion: nil,
|
|
}
|
|
|
|
fillCoderDesktopTelemetry(r, &connectionTelemetryEvent, api.Logger)
|
|
api.Telemetry.Report(&telemetry.Snapshot{
|
|
UserTailnetConnections: []telemetry.UserTailnetConnection{connectionTelemetryEvent},
|
|
})
|
|
defer func() {
|
|
// Update telemetry event with disconnection time
|
|
disconnectTime := time.Now()
|
|
connectionTelemetryEvent.DisconnectedAt = &disconnectTime
|
|
api.Telemetry.Report(&telemetry.Snapshot{
|
|
UserTailnetConnections: []telemetry.UserTailnetConnection{connectionTelemetryEvent},
|
|
})
|
|
}()
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
go httpapi.HeartbeatClose(ctx, api.Logger, cancel, conn)
|
|
err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, tailnet.StreamID{
|
|
Name: "client",
|
|
ID: peerID,
|
|
Auth: tailnet.ClientUserCoordinateeAuth{
|
|
Auth: &rbacAuthorizer{
|
|
sshPrep: sshPrep,
|
|
db: api.Database,
|
|
},
|
|
},
|
|
})
|
|
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
|
|
_ = conn.Close(websocket.StatusInternalError, err.Error())
|
|
return
|
|
}
|
|
}
|
|
|
|
// fillCoderDesktopTelemetry fills out the provided event based on a Coder Desktop telemetry header on the request, if
|
|
// present.
|
|
func fillCoderDesktopTelemetry(r *http.Request, event *telemetry.UserTailnetConnection, logger slog.Logger) {
|
|
// Parse desktop telemetry from header if it exists
|
|
desktopTelemetryHeader := r.Header.Get(codersdk.CoderDesktopTelemetryHeader)
|
|
if desktopTelemetryHeader != "" {
|
|
var telemetryData codersdk.CoderDesktopTelemetry
|
|
if err := telemetryData.FromHeader(desktopTelemetryHeader); err == nil {
|
|
// Only set fields if they aren't empty
|
|
if telemetryData.DeviceID != "" {
|
|
event.DeviceID = &telemetryData.DeviceID
|
|
}
|
|
if telemetryData.DeviceOS != "" {
|
|
event.DeviceOS = &telemetryData.DeviceOS
|
|
}
|
|
if telemetryData.CoderDesktopVersion != "" {
|
|
event.CoderDesktopVersion = &telemetryData.CoderDesktopVersion
|
|
}
|
|
logger.Debug(r.Context(), "received desktop telemetry",
|
|
slog.F("device_id", telemetryData.DeviceID),
|
|
slog.F("device_os", telemetryData.DeviceOS),
|
|
slog.F("desktop_version", telemetryData.CoderDesktopVersion))
|
|
} else {
|
|
logger.Warn(r.Context(), "failed to parse desktop telemetry header", slog.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// createExternalAuthResponse creates an ExternalAuthResponse based on the
|
|
// provider type. This is to support legacy `/workspaceagents/me/gitauth`
|
|
// which uses `Username` and `Password`.
|
|
func createExternalAuthResponse(typ, token string, extra pqtype.NullRawMessage) (agentsdk.ExternalAuthResponse, error) {
|
|
var resp agentsdk.ExternalAuthResponse
|
|
switch typ {
|
|
case string(codersdk.EnhancedExternalAuthProviderGitLab):
|
|
// https://stackoverflow.com/questions/25409700/using-gitlab-token-to-clone-without-authentication
|
|
resp = agentsdk.ExternalAuthResponse{
|
|
Username: "oauth2",
|
|
Password: token,
|
|
}
|
|
case string(codersdk.EnhancedExternalAuthProviderBitBucketCloud), string(codersdk.EnhancedExternalAuthProviderBitBucketServer):
|
|
// The string "bitbucket" was a legacy parameter that needs to still be supported.
|
|
// https://support.atlassian.com/bitbucket-cloud/docs/use-oauth-on-bitbucket-cloud/#Cloning-a-repository-with-an-access-token
|
|
resp = agentsdk.ExternalAuthResponse{
|
|
Username: "x-token-auth",
|
|
Password: token,
|
|
}
|
|
default:
|
|
resp = agentsdk.ExternalAuthResponse{
|
|
Username: token,
|
|
}
|
|
}
|
|
resp.AccessToken = token
|
|
resp.Type = typ
|
|
|
|
var err error
|
|
if extra.Valid {
|
|
err = json.Unmarshal(extra.RawMessage, &resp.TokenExtra)
|
|
}
|
|
return resp, err
|
|
}
|
|
|
|
func convertWorkspaceAgentLogs(logs []database.WorkspaceAgentLog) []codersdk.WorkspaceAgentLog {
|
|
sdk := make([]codersdk.WorkspaceAgentLog, 0, len(logs))
|
|
for _, logEntry := range logs {
|
|
sdk = append(sdk, db2sdk.WorkspaceAgentLog(logEntry))
|
|
}
|
|
return sdk
|
|
}
|