mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
a7c9e623fe
In https://github.com/coder/coder/pull/20137, we added a new flag to `coder provisioner jobs list`, namely `--initiator`. To make some follow-up worth it, I need to rename an API param used in the process before it becomes part of our released and tagged API. Instead of only accepting UUIDs, we accept an arbitrary string. We still validate it as a UUID now, but we will expand its validation to allow any string and then resolve that string the same way that we resolve the user parameter elsewhere in the API.
660 lines
21 KiB
Go
660 lines
21 KiB
Go
package coderd
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"io"
|
|
"net/http"
|
|
"sort"
|
|
"strconv"
|
|
|
|
"github.com/google/uuid"
|
|
"golang.org/x/xerrors"
|
|
|
|
"cdr.dev/slog"
|
|
"github.com/coder/coder/v2/coderd/database"
|
|
"github.com/coder/coder/v2/coderd/database/db2sdk"
|
|
"github.com/coder/coder/v2/coderd/database/dbauthz"
|
|
"github.com/coder/coder/v2/coderd/database/pubsub"
|
|
"github.com/coder/coder/v2/coderd/httpapi"
|
|
"github.com/coder/coder/v2/coderd/httpmw"
|
|
"github.com/coder/coder/v2/coderd/httpmw/loggermw"
|
|
"github.com/coder/coder/v2/coderd/rbac"
|
|
"github.com/coder/coder/v2/coderd/rbac/policy"
|
|
"github.com/coder/coder/v2/coderd/util/slice"
|
|
"github.com/coder/coder/v2/codersdk"
|
|
"github.com/coder/coder/v2/codersdk/wsjson"
|
|
"github.com/coder/coder/v2/provisionersdk"
|
|
"github.com/coder/websocket"
|
|
)
|
|
|
|
// @Summary Get provisioner job
|
|
// @ID get-provisioner-job
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Organizations
|
|
// @Param organization path string true "Organization ID" format(uuid)
|
|
// @Param job path string true "Job ID" format(uuid)
|
|
// @Success 200 {object} codersdk.ProvisionerJob
|
|
// @Router /organizations/{organization}/provisionerjobs/{job} [get]
|
|
func (api *API) provisionerJob(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
|
|
jobID, ok := httpmw.ParseUUIDParam(rw, r, "job")
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
jobs, ok := api.handleAuthAndFetchProvisionerJobs(rw, r, []uuid.UUID{jobID})
|
|
if !ok {
|
|
return
|
|
}
|
|
if len(jobs) == 0 {
|
|
httpapi.ResourceNotFound(rw)
|
|
return
|
|
}
|
|
if len(jobs) > 1 || jobs[0].ProvisionerJob.ID != jobID {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching provisioner job.",
|
|
Detail: "Database returned an unexpected job.",
|
|
})
|
|
return
|
|
}
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, convertProvisionerJobWithQueuePosition(jobs[0]))
|
|
}
|
|
|
|
// @Summary Get provisioner jobs
|
|
// @ID get-provisioner-jobs
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Organizations
|
|
// @Param organization path string true "Organization ID" format(uuid)
|
|
// @Param limit query int false "Page limit"
|
|
// @Param ids query []string false "Filter results by job IDs" format(uuid)
|
|
// @Param status query codersdk.ProvisionerJobStatus false "Filter results by status" enums(pending,running,succeeded,canceling,canceled,failed)
|
|
// @Param tags query object false "Provisioner tags to filter by (JSON of the form {'tag1':'value1','tag2':'value2'})"
|
|
// @Param initiator query string false "Filter results by initiator" format(uuid)
|
|
// @Success 200 {array} codersdk.ProvisionerJob
|
|
// @Router /organizations/{organization}/provisionerjobs [get]
|
|
func (api *API) provisionerJobs(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
|
|
jobs, ok := api.handleAuthAndFetchProvisionerJobs(rw, r, nil)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, db2sdk.List(jobs, convertProvisionerJobWithQueuePosition))
|
|
}
|
|
|
|
// handleAuthAndFetchProvisionerJobs is an internal method shared by
|
|
// provisionerJob and provisionerJobs. If ok is false the caller should
|
|
// return immediately because the response has already been written.
|
|
func (api *API) handleAuthAndFetchProvisionerJobs(rw http.ResponseWriter, r *http.Request, ids []uuid.UUID) (_ []database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, ok bool) {
|
|
ctx := r.Context()
|
|
org := httpmw.OrganizationParam(r)
|
|
|
|
// For now, only owners and template admins can access provisioner jobs.
|
|
if !api.Authorize(r, policy.ActionRead, rbac.ResourceProvisionerJobs.InOrg(org.ID)) {
|
|
httpapi.ResourceNotFound(rw)
|
|
return nil, false
|
|
}
|
|
|
|
qp := r.URL.Query()
|
|
p := httpapi.NewQueryParamParser()
|
|
limit := p.PositiveInt32(qp, 50, "limit")
|
|
status := p.Strings(qp, nil, "status")
|
|
if ids == nil {
|
|
ids = p.UUIDs(qp, nil, "ids")
|
|
}
|
|
tags := p.JSONStringMap(qp, database.StringMap{}, "tags")
|
|
initiatorID := p.UUID(qp, uuid.Nil, "initiator")
|
|
p.ErrorExcessParams(qp)
|
|
if len(p.Errors) > 0 {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Invalid query parameters.",
|
|
Validations: p.Errors,
|
|
})
|
|
return nil, false
|
|
}
|
|
|
|
jobs, err := api.Database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx, database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams{
|
|
OrganizationID: org.ID,
|
|
Status: slice.StringEnums[database.ProvisionerJobStatus](status),
|
|
Limit: sql.NullInt32{Int32: limit, Valid: limit > 0},
|
|
IDs: ids,
|
|
Tags: tags,
|
|
InitiatorID: initiatorID,
|
|
})
|
|
if err != nil {
|
|
if httpapi.Is404Error(err) {
|
|
httpapi.ResourceNotFound(rw)
|
|
return nil, false
|
|
}
|
|
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching provisioner jobs.",
|
|
Detail: err.Error(),
|
|
})
|
|
return nil, false
|
|
}
|
|
|
|
return jobs, true
|
|
}
|
|
|
|
// Returns provisioner logs based on query parameters.
|
|
// The intended usage for a client to stream all logs (with JS API):
|
|
// GET /logs
|
|
// GET /logs?after=<id>&follow
|
|
// The combination of these responses should provide all current logs
|
|
// to the consumer, and future logs are streamed in the follow request.
|
|
func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob) {
|
|
var (
|
|
ctx = r.Context()
|
|
logger = api.Logger.With(slog.F("job_id", job.ID))
|
|
follow = r.URL.Query().Has("follow")
|
|
afterRaw = r.URL.Query().Get("after")
|
|
)
|
|
|
|
var after int64
|
|
// Only fetch logs created after the time provided.
|
|
if afterRaw != "" {
|
|
var err error
|
|
after, err = strconv.ParseInt(afterRaw, 10, 64)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Query param \"after\" must be an integer.",
|
|
Validations: []codersdk.ValidationError{
|
|
{Field: "after", Detail: "Must be an integer"},
|
|
},
|
|
})
|
|
return
|
|
}
|
|
}
|
|
|
|
if !follow {
|
|
fetchAndWriteLogs(ctx, api.Database, job.ID, after, rw)
|
|
return
|
|
}
|
|
|
|
follower := newLogFollower(ctx, logger, api.Database, api.Pubsub, rw, r, job, after)
|
|
api.WebsocketWaitMutex.Lock()
|
|
api.WebsocketWaitGroup.Add(1)
|
|
api.WebsocketWaitMutex.Unlock()
|
|
defer api.WebsocketWaitGroup.Done()
|
|
follower.follow()
|
|
}
|
|
|
|
func (api *API) provisionerJobResources(rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob) {
|
|
ctx := r.Context()
|
|
if !job.CompletedAt.Valid {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Job hasn't completed!",
|
|
})
|
|
return
|
|
}
|
|
|
|
// nolint:gocritic // GetWorkspaceResourcesByJobID is a system function.
|
|
resources, err := api.Database.GetWorkspaceResourcesByJobID(dbauthz.AsSystemRestricted(ctx), job.ID)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching job resources.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
resourceIDs := make([]uuid.UUID, 0)
|
|
for _, resource := range resources {
|
|
resourceIDs = append(resourceIDs, resource.ID)
|
|
}
|
|
|
|
// nolint:gocritic // GetWorkspaceAgentsByResourceIDs is a system function.
|
|
resourceAgents, err := api.Database.GetWorkspaceAgentsByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
resourceAgentIDs := make([]uuid.UUID, 0)
|
|
for _, agent := range resourceAgents {
|
|
resourceAgentIDs = append(resourceAgentIDs, agent.ID)
|
|
}
|
|
|
|
// nolint:gocritic // GetWorkspaceAppsByAgentIDs is a system function.
|
|
apps, err := api.Database.GetWorkspaceAppsByAgentIDs(dbauthz.AsSystemRestricted(ctx), resourceAgentIDs)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace applications.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// nolint:gocritic // GetWorkspaceAgentScriptsByAgentIDs is a system function.
|
|
scripts, err := api.Database.GetWorkspaceAgentScriptsByAgentIDs(dbauthz.AsSystemRestricted(ctx), resourceAgentIDs)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace agent scripts.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// nolint:gocritic // GetWorkspaceAgentLogSourcesByAgentIDs is a system function.
|
|
logSources, err := api.Database.GetWorkspaceAgentLogSourcesByAgentIDs(dbauthz.AsSystemRestricted(ctx), resourceAgentIDs)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace agent log sources.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// nolint:gocritic // GetWorkspaceResourceMetadataByResourceIDs is a system function.
|
|
resourceMetadata, err := api.Database.GetWorkspaceResourceMetadataByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace metadata.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
apiResources := make([]codersdk.WorkspaceResource, 0)
|
|
for _, resource := range resources {
|
|
agents := make([]codersdk.WorkspaceAgent, 0)
|
|
for _, agent := range resourceAgents {
|
|
if agent.ResourceID != resource.ID {
|
|
continue
|
|
}
|
|
dbApps := make([]database.WorkspaceApp, 0)
|
|
for _, app := range apps {
|
|
if app.AgentID == agent.ID {
|
|
dbApps = append(dbApps, app)
|
|
}
|
|
}
|
|
dbScripts := make([]database.WorkspaceAgentScript, 0)
|
|
for _, script := range scripts {
|
|
if script.WorkspaceAgentID == agent.ID {
|
|
dbScripts = append(dbScripts, script)
|
|
}
|
|
}
|
|
dbLogSources := make([]database.WorkspaceAgentLogSource, 0)
|
|
for _, logSource := range logSources {
|
|
if logSource.WorkspaceAgentID == agent.ID {
|
|
dbLogSources = append(dbLogSources, logSource)
|
|
}
|
|
}
|
|
|
|
apiAgent, err := db2sdk.WorkspaceAgent(
|
|
api.DERPMap(), *api.TailnetCoordinator.Load(), agent, convertProvisionedApps(dbApps), convertScripts(dbScripts), convertLogSources(dbLogSources), api.AgentInactiveDisconnectTimeout,
|
|
api.DeploymentValues.AgentFallbackTroubleshootingURL.String(),
|
|
)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error reading job agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
agents = append(agents, apiAgent)
|
|
}
|
|
metadata := make([]database.WorkspaceResourceMetadatum, 0)
|
|
for _, field := range resourceMetadata {
|
|
if field.WorkspaceResourceID == resource.ID {
|
|
metadata = append(metadata, field)
|
|
}
|
|
}
|
|
apiResources = append(apiResources, convertWorkspaceResource(resource, agents, metadata))
|
|
}
|
|
sort.Slice(apiResources, func(i, j int) bool {
|
|
return apiResources[i].Name < apiResources[j].Name
|
|
})
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, apiResources)
|
|
}
|
|
|
|
func convertProvisionerJobLogs(provisionerJobLogs []database.ProvisionerJobLog) []codersdk.ProvisionerJobLog {
|
|
sdk := make([]codersdk.ProvisionerJobLog, 0, len(provisionerJobLogs))
|
|
for _, log := range provisionerJobLogs {
|
|
sdk = append(sdk, convertProvisionerJobLog(log))
|
|
}
|
|
return sdk
|
|
}
|
|
|
|
func convertProvisionerJobLog(provisionerJobLog database.ProvisionerJobLog) codersdk.ProvisionerJobLog {
|
|
return codersdk.ProvisionerJobLog{
|
|
ID: provisionerJobLog.ID,
|
|
CreatedAt: provisionerJobLog.CreatedAt,
|
|
Source: codersdk.LogSource(provisionerJobLog.Source),
|
|
Level: codersdk.LogLevel(provisionerJobLog.Level),
|
|
Stage: provisionerJobLog.Stage,
|
|
Output: provisionerJobLog.Output,
|
|
}
|
|
}
|
|
|
|
func convertProvisionerJob(pj database.GetProvisionerJobsByIDsWithQueuePositionRow) codersdk.ProvisionerJob {
|
|
provisionerJob := pj.ProvisionerJob
|
|
job := codersdk.ProvisionerJob{
|
|
ID: provisionerJob.ID,
|
|
OrganizationID: provisionerJob.OrganizationID,
|
|
InitiatorID: provisionerJob.InitiatorID,
|
|
CreatedAt: provisionerJob.CreatedAt,
|
|
Type: codersdk.ProvisionerJobType(provisionerJob.Type),
|
|
Error: provisionerJob.Error.String,
|
|
ErrorCode: codersdk.JobErrorCode(provisionerJob.ErrorCode.String),
|
|
FileID: provisionerJob.FileID,
|
|
Tags: provisionerJob.Tags,
|
|
QueuePosition: int(pj.QueuePosition),
|
|
QueueSize: int(pj.QueueSize),
|
|
LogsOverflowed: provisionerJob.LogsOverflowed,
|
|
}
|
|
// Applying values optional to the struct.
|
|
if provisionerJob.StartedAt.Valid {
|
|
job.StartedAt = &provisionerJob.StartedAt.Time
|
|
}
|
|
if provisionerJob.CompletedAt.Valid {
|
|
job.CompletedAt = &provisionerJob.CompletedAt.Time
|
|
}
|
|
if provisionerJob.CanceledAt.Valid {
|
|
job.CanceledAt = &provisionerJob.CanceledAt.Time
|
|
}
|
|
if provisionerJob.WorkerID.Valid {
|
|
job.WorkerID = &provisionerJob.WorkerID.UUID
|
|
}
|
|
job.Status = codersdk.ProvisionerJobStatus(pj.ProvisionerJob.JobStatus)
|
|
|
|
// Only unmarshal input if it exists, this should only be zero in testing.
|
|
if len(provisionerJob.Input) > 0 {
|
|
if err := json.Unmarshal(provisionerJob.Input, &job.Input); err != nil {
|
|
job.Input.Error = xerrors.Errorf("decode input %s: %w", provisionerJob.Input, err).Error()
|
|
}
|
|
}
|
|
|
|
return job
|
|
}
|
|
|
|
func convertProvisionerJobWithQueuePosition(pj database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow) codersdk.ProvisionerJob {
|
|
job := convertProvisionerJob(database.GetProvisionerJobsByIDsWithQueuePositionRow{
|
|
ProvisionerJob: pj.ProvisionerJob,
|
|
QueuePosition: pj.QueuePosition,
|
|
QueueSize: pj.QueueSize,
|
|
})
|
|
job.WorkerName = pj.WorkerName
|
|
job.AvailableWorkers = pj.AvailableWorkers
|
|
job.Metadata = codersdk.ProvisionerJobMetadata{
|
|
TemplateVersionName: pj.TemplateVersionName,
|
|
TemplateID: pj.TemplateID.UUID,
|
|
TemplateName: pj.TemplateName,
|
|
TemplateDisplayName: pj.TemplateDisplayName,
|
|
TemplateIcon: pj.TemplateIcon,
|
|
WorkspaceName: pj.WorkspaceName,
|
|
}
|
|
if pj.WorkspaceID.Valid {
|
|
job.Metadata.WorkspaceID = &pj.WorkspaceID.UUID
|
|
}
|
|
return job
|
|
}
|
|
|
|
func fetchAndWriteLogs(ctx context.Context, db database.Store, jobID uuid.UUID, after int64, rw http.ResponseWriter) {
|
|
logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
|
|
JobID: jobID,
|
|
CreatedAfter: after,
|
|
})
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching provisioner logs.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if logs == nil {
|
|
logs = []database.ProvisionerJobLog{}
|
|
}
|
|
httpapi.Write(ctx, rw, http.StatusOK, convertProvisionerJobLogs(logs))
|
|
}
|
|
|
|
func jobIsComplete(logger slog.Logger, job database.ProvisionerJob) bool {
|
|
status := codersdk.ProvisionerJobStatus(job.JobStatus)
|
|
switch status {
|
|
case codersdk.ProvisionerJobCanceled:
|
|
return true
|
|
case codersdk.ProvisionerJobFailed:
|
|
return true
|
|
case codersdk.ProvisionerJobSucceeded:
|
|
return true
|
|
case codersdk.ProvisionerJobPending:
|
|
return false
|
|
case codersdk.ProvisionerJobCanceling:
|
|
return false
|
|
case codersdk.ProvisionerJobRunning:
|
|
return false
|
|
default:
|
|
logger.Error(context.Background(),
|
|
"can't convert the provisioner job status",
|
|
slog.F("job_id", job.ID), slog.F("status", status))
|
|
return false
|
|
}
|
|
}
|
|
|
|
type logFollower struct {
|
|
ctx context.Context
|
|
logger slog.Logger
|
|
db database.Store
|
|
pubsub pubsub.Pubsub
|
|
r *http.Request
|
|
rw http.ResponseWriter
|
|
conn *websocket.Conn
|
|
enc *wsjson.Encoder[codersdk.ProvisionerJobLog]
|
|
|
|
jobID uuid.UUID
|
|
after int64
|
|
complete bool
|
|
notifications chan provisionersdk.ProvisionerJobLogsNotifyMessage
|
|
errors chan error
|
|
}
|
|
|
|
func newLogFollower(
|
|
ctx context.Context, logger slog.Logger, db database.Store, ps pubsub.Pubsub,
|
|
rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob, after int64,
|
|
) *logFollower {
|
|
return &logFollower{
|
|
ctx: ctx,
|
|
logger: logger,
|
|
db: db,
|
|
pubsub: ps,
|
|
r: r,
|
|
rw: rw,
|
|
jobID: job.ID,
|
|
after: after,
|
|
complete: jobIsComplete(logger, job),
|
|
notifications: make(chan provisionersdk.ProvisionerJobLogsNotifyMessage),
|
|
errors: make(chan error),
|
|
}
|
|
}
|
|
|
|
func (f *logFollower) follow() {
|
|
var cancel context.CancelFunc
|
|
f.ctx, cancel = context.WithCancel(f.ctx)
|
|
defer cancel()
|
|
// note that we only need to subscribe to updates if the job is not yet
|
|
// complete.
|
|
if !f.complete {
|
|
subCancel, err := f.pubsub.SubscribeWithErr(
|
|
provisionersdk.ProvisionerJobLogsNotifyChannel(f.jobID),
|
|
f.listener,
|
|
)
|
|
if err != nil {
|
|
httpapi.Write(f.ctx, f.rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "failed to subscribe to job updates",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
defer subCancel()
|
|
// Move cancel up the stack so it happens before unsubscribing,
|
|
// otherwise we can end up in a deadlock due to how the
|
|
// in-memory pubsub does mutex locking on send/unsubscribe.
|
|
defer cancel()
|
|
|
|
// we were provided `complete` prior to starting this subscription, so
|
|
// we also need to check whether the job is now complete, in case the
|
|
// job completed between the last time we queried the job and the start
|
|
// of the subscription. If the job completes after this, we will get
|
|
// a notification on the subscription.
|
|
job, err := f.db.GetProvisionerJobByID(f.ctx, f.jobID)
|
|
if err != nil {
|
|
httpapi.Write(f.ctx, f.rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "failed to query job",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
f.complete = jobIsComplete(f.logger, job)
|
|
f.logger.Debug(f.ctx, "queried job after subscribe", slog.F("complete", f.complete))
|
|
}
|
|
|
|
var err error
|
|
f.conn, err = websocket.Accept(f.rw, f.r, nil)
|
|
if err != nil {
|
|
httpapi.Write(f.ctx, f.rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Failed to accept websocket.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
defer f.conn.Close(websocket.StatusNormalClosure, "done")
|
|
go httpapi.Heartbeat(f.ctx, f.conn)
|
|
f.enc = wsjson.NewEncoder[codersdk.ProvisionerJobLog](f.conn, websocket.MessageText)
|
|
|
|
// query for logs once right away, so we can get historical data from before
|
|
// subscription
|
|
if err := f.query(); err != nil {
|
|
if f.ctx.Err() == nil && !xerrors.Is(err, io.EOF) {
|
|
// neither context expiry, nor EOF, close and log
|
|
f.logger.Error(f.ctx, "failed to query logs", slog.Error(err))
|
|
err = f.conn.Close(websocket.StatusInternalError, err.Error())
|
|
if err != nil {
|
|
f.logger.Warn(f.ctx, "failed to close webscoket", slog.Error(err))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Log the request immediately instead of after it completes.
|
|
if rl := loggermw.RequestLoggerFromContext(f.ctx); rl != nil {
|
|
rl.WriteLog(f.ctx, http.StatusAccepted)
|
|
}
|
|
|
|
// no need to wait if the job is done
|
|
if f.complete {
|
|
return
|
|
}
|
|
for {
|
|
select {
|
|
case err := <-f.errors:
|
|
// we've dropped at least one notification. This can happen if we
|
|
// lose database connectivity. We don't know whether the job is
|
|
// now complete since we could have missed the end of logs message.
|
|
// We could soldier on and retry, but loss of database connectivity
|
|
// is fairly serious, so instead just 500 and bail out. Client
|
|
// can retry and hopefully find a healthier node.
|
|
f.logger.Error(f.ctx, "dropped or corrupted notification", slog.Error(err))
|
|
err = f.conn.Close(websocket.StatusInternalError, err.Error())
|
|
if err != nil {
|
|
f.logger.Warn(f.ctx, "failed to close webscoket", slog.Error(err))
|
|
}
|
|
return
|
|
case <-f.ctx.Done():
|
|
// client disconnect
|
|
return
|
|
case n := <-f.notifications:
|
|
if n.EndOfLogs {
|
|
// safe to return here because we started the subscription,
|
|
// and then queried at least once, so we will have already
|
|
// gotten all logs prior to the start of our subscription.
|
|
return
|
|
}
|
|
err = f.query()
|
|
if err != nil {
|
|
if f.ctx.Err() == nil && !xerrors.Is(err, io.EOF) {
|
|
// neither context expiry, nor EOF, close and log
|
|
f.logger.Error(f.ctx, "failed to query logs", slog.Error(err))
|
|
err = f.conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("%s", err.Error()))
|
|
if err != nil {
|
|
f.logger.Warn(f.ctx, "failed to close webscoket", slog.Error(err))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *logFollower) listener(_ context.Context, message []byte, err error) {
|
|
// in this function we always pair writes to channels with a select on the context
|
|
// otherwise we could block a goroutine if the follow() method exits.
|
|
if err != nil {
|
|
select {
|
|
case <-f.ctx.Done():
|
|
case f.errors <- err:
|
|
}
|
|
return
|
|
}
|
|
var n provisionersdk.ProvisionerJobLogsNotifyMessage
|
|
err = json.Unmarshal(message, &n)
|
|
if err != nil {
|
|
select {
|
|
case <-f.ctx.Done():
|
|
case f.errors <- err:
|
|
}
|
|
return
|
|
}
|
|
select {
|
|
case <-f.ctx.Done():
|
|
case f.notifications <- n:
|
|
}
|
|
}
|
|
|
|
// query fetches the latest job logs from the database and writes them to the
|
|
// connection.
|
|
func (f *logFollower) query() error {
|
|
f.logger.Debug(f.ctx, "querying logs", slog.F("after", f.after))
|
|
logs, err := f.db.GetProvisionerLogsAfterID(f.ctx, database.GetProvisionerLogsAfterIDParams{
|
|
JobID: f.jobID,
|
|
CreatedAfter: f.after,
|
|
})
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
return xerrors.Errorf("error fetching logs: %w", err)
|
|
}
|
|
for _, log := range logs {
|
|
err := f.enc.Encode(convertProvisionerJobLog(log))
|
|
if err != nil {
|
|
return xerrors.Errorf("error writing to websocket: %w", err)
|
|
}
|
|
f.after = log.ID
|
|
f.logger.Debug(f.ctx, "wrote log to websocket", slog.F("id", log.ID))
|
|
}
|
|
return nil
|
|
}
|