mirror of
https://github.com/coder/coder.git
synced 2026-06-06 14:38:23 +00:00
46e04c68e3
This pull request adds support for presets to coder provisioners. If a template defines presets using a compatible version of the provider, then this PR will allow those presets to be persisted to the control plane database for use in workspace creation.
1178 lines
36 KiB
Go
1178 lines
36 KiB
Go
package runner
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.14.0"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"golang.org/x/xerrors"
|
|
|
|
"cdr.dev/slog"
|
|
|
|
"github.com/coder/coder/v2/coderd/tracing"
|
|
"github.com/coder/coder/v2/coderd/util/ptr"
|
|
"github.com/coder/coder/v2/provisionerd/proto"
|
|
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
|
|
)
|
|
|
|
const (
|
|
MissingParameterErrorCode = "MISSING_TEMPLATE_PARAMETER"
|
|
missingParameterErrorText = "missing parameter"
|
|
|
|
RequiredTemplateVariablesErrorCode = "REQUIRED_TEMPLATE_VARIABLES"
|
|
requiredTemplateVariablesErrorText = "required template variables"
|
|
)
|
|
|
|
var errorCodes = map[string]string{
|
|
MissingParameterErrorCode: missingParameterErrorText,
|
|
RequiredTemplateVariablesErrorCode: requiredTemplateVariablesErrorText,
|
|
}
|
|
|
|
var errUpdateSkipped = xerrors.New("update skipped; job complete or failed")
|
|
|
|
type Runner struct {
|
|
tracer trace.Tracer
|
|
metrics Metrics
|
|
job *proto.AcquiredJob
|
|
sender JobUpdater
|
|
quotaCommitter QuotaCommitter
|
|
logger slog.Logger
|
|
provisioner sdkproto.DRPCProvisionerClient
|
|
lastUpdate atomic.Pointer[time.Time]
|
|
updateInterval time.Duration
|
|
forceCancelInterval time.Duration
|
|
logBufferInterval time.Duration
|
|
|
|
// session is the provisioning session with the (possibly remote) provisioner
|
|
session sdkproto.DRPCProvisioner_SessionClient
|
|
// closed when the Runner is finished sending any updates/failed/complete.
|
|
done chan struct{}
|
|
// active as long as we are not canceled
|
|
notCanceled context.Context
|
|
cancel context.CancelFunc
|
|
// active as long as we haven't been force stopped
|
|
notStopped context.Context
|
|
stop context.CancelFunc
|
|
|
|
// mutex controls access to all the following variables.
|
|
mutex *sync.Mutex
|
|
// used to wait for the failedJob or completedJob to be populated
|
|
cond *sync.Cond
|
|
flushLogsTimer *time.Timer
|
|
queuedLogs []*proto.Log
|
|
failedJob *proto.FailedJob
|
|
completedJob *proto.CompletedJob
|
|
// setting this false signals that no more messages about this job should be sent. Usually this
|
|
// means that a terminal message like FailedJob or CompletedJob has been sent, even in the case
|
|
// of a Cancel(). However, when someone calls Fail() or ForceStop(), we might not send the
|
|
// terminal message, but okToSend is set to false regardless.
|
|
okToSend bool
|
|
}
|
|
|
|
type Metrics struct {
|
|
ConcurrentJobs *prometheus.GaugeVec
|
|
NumDaemons prometheus.Gauge
|
|
// JobTimings also counts the total amount of jobs.
|
|
JobTimings *prometheus.HistogramVec
|
|
// WorkspaceBuilds counts workspace build successes and failures.
|
|
WorkspaceBuilds *prometheus.CounterVec
|
|
WorkspaceBuildTimings *prometheus.HistogramVec
|
|
}
|
|
|
|
type JobUpdater interface {
|
|
UpdateJob(ctx context.Context, in *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error)
|
|
FailJob(ctx context.Context, in *proto.FailedJob) error
|
|
CompleteJob(ctx context.Context, in *proto.CompletedJob) error
|
|
}
|
|
type QuotaCommitter interface {
|
|
CommitQuota(ctx context.Context, in *proto.CommitQuotaRequest) (*proto.CommitQuotaResponse, error)
|
|
}
|
|
|
|
type Options struct {
|
|
Updater JobUpdater
|
|
QuotaCommitter QuotaCommitter
|
|
Logger slog.Logger
|
|
Provisioner sdkproto.DRPCProvisionerClient
|
|
UpdateInterval time.Duration
|
|
ForceCancelInterval time.Duration
|
|
LogDebounceInterval time.Duration
|
|
Tracer trace.Tracer
|
|
Metrics Metrics
|
|
}
|
|
|
|
func New(
|
|
ctx context.Context,
|
|
job *proto.AcquiredJob,
|
|
opts Options,
|
|
) *Runner {
|
|
m := new(sync.Mutex)
|
|
|
|
// we need to create our contexts here in case a call to Cancel() comes immediately.
|
|
forceStopContext, forceStopFunc := context.WithCancel(ctx)
|
|
gracefulContext, cancelFunc := context.WithCancel(forceStopContext)
|
|
|
|
logger := opts.Logger.With(slog.F("job_id", job.JobId))
|
|
if build := job.GetWorkspaceBuild(); build != nil {
|
|
logger = logger.With(
|
|
slog.F("template_name", build.Metadata.TemplateName),
|
|
slog.F("template_version", build.Metadata.TemplateVersion),
|
|
slog.F("workspace_build_id", build.WorkspaceBuildId),
|
|
slog.F("workspace_id", build.Metadata.WorkspaceId),
|
|
slog.F("workspace_name", build.Metadata.WorkspaceName),
|
|
slog.F("workspace_owner", build.Metadata.WorkspaceOwner),
|
|
slog.F("workspace_transition", strings.ToLower(build.Metadata.WorkspaceTransition.String())),
|
|
)
|
|
}
|
|
|
|
return &Runner{
|
|
tracer: opts.Tracer,
|
|
metrics: opts.Metrics,
|
|
job: job,
|
|
sender: opts.Updater,
|
|
quotaCommitter: opts.QuotaCommitter,
|
|
logger: logger,
|
|
provisioner: opts.Provisioner,
|
|
updateInterval: opts.UpdateInterval,
|
|
forceCancelInterval: opts.ForceCancelInterval,
|
|
logBufferInterval: opts.LogDebounceInterval,
|
|
queuedLogs: make([]*proto.Log, 0),
|
|
mutex: m,
|
|
cond: sync.NewCond(m),
|
|
done: make(chan struct{}),
|
|
okToSend: true,
|
|
notStopped: forceStopContext,
|
|
stop: forceStopFunc,
|
|
notCanceled: gracefulContext,
|
|
cancel: cancelFunc,
|
|
}
|
|
}
|
|
|
|
// Run executes the job.
|
|
//
|
|
// the idea here is to run two goroutines to work on the job: doCleanFinish and heartbeat, then use
|
|
// the `r.cond` to wait until the job is either complete or failed. This function then sends the
|
|
// complete or failed message --- the exception to this is if something calls Fail() on the Runner;
|
|
// either something external, like the server getting closed, or the heartbeat goroutine timing out
|
|
// after attempting to gracefully cancel. If something calls Fail(), then the failure is sent on
|
|
// that goroutine on the context passed into Fail(), and it marks okToSend false to signal us here
|
|
// that this function should not also send a terminal message.
|
|
func (r *Runner) Run() {
|
|
start := time.Now()
|
|
ctx, span := r.startTrace(r.notStopped, tracing.FuncName())
|
|
defer span.End()
|
|
|
|
concurrentGauge := r.metrics.ConcurrentJobs.WithLabelValues(r.job.Provisioner)
|
|
concurrentGauge.Inc()
|
|
defer func() {
|
|
status := "success"
|
|
if r.failedJob != nil {
|
|
status = "failed"
|
|
}
|
|
|
|
concurrentGauge.Dec()
|
|
if build := r.job.GetWorkspaceBuild(); build != nil {
|
|
r.metrics.WorkspaceBuilds.WithLabelValues(
|
|
build.Metadata.WorkspaceOwner,
|
|
build.Metadata.WorkspaceName,
|
|
build.Metadata.TemplateName,
|
|
build.Metadata.TemplateVersion,
|
|
build.Metadata.WorkspaceTransition.String(),
|
|
status,
|
|
).Inc()
|
|
r.metrics.WorkspaceBuildTimings.WithLabelValues(
|
|
build.Metadata.TemplateName,
|
|
build.Metadata.TemplateVersion,
|
|
build.Metadata.WorkspaceTransition.String(),
|
|
status,
|
|
).Observe(time.Since(start).Seconds())
|
|
}
|
|
r.metrics.JobTimings.WithLabelValues(r.job.Provisioner, status).Observe(time.Since(start).Seconds())
|
|
}()
|
|
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
defer r.stop()
|
|
|
|
go r.doCleanFinish(ctx)
|
|
go r.heartbeatRoutine(ctx)
|
|
for r.failedJob == nil && r.completedJob == nil {
|
|
r.cond.Wait()
|
|
}
|
|
if !r.okToSend {
|
|
// nothing else to do.
|
|
return
|
|
}
|
|
if r.failedJob != nil {
|
|
span.RecordError(xerrors.New(r.failedJob.Error))
|
|
span.SetStatus(codes.Error, r.failedJob.Error)
|
|
|
|
r.logger.Debug(ctx, "sending FailedJob")
|
|
err := r.sender.FailJob(ctx, r.failedJob)
|
|
if err != nil {
|
|
r.logger.Error(ctx, "sending FailJob failed", slog.Error(err))
|
|
} else {
|
|
r.logger.Debug(ctx, "sent FailedJob")
|
|
}
|
|
} else {
|
|
r.logger.Debug(ctx, "sending CompletedJob")
|
|
err := r.sender.CompleteJob(ctx, r.completedJob)
|
|
if err != nil {
|
|
r.logger.Error(ctx, "sending CompletedJob failed", slog.Error(err))
|
|
err = r.sender.FailJob(ctx, r.failedJobf("internal provisionerserver error: %s", err))
|
|
if err != nil {
|
|
r.logger.Error(ctx, "sending FailJob failed (while CompletedJob)", slog.Error(err))
|
|
}
|
|
} else {
|
|
r.logger.Debug(ctx, "sent CompletedJob")
|
|
}
|
|
}
|
|
close(r.done)
|
|
r.okToSend = false
|
|
}
|
|
|
|
// Cancel initiates a Cancel on the job, but allows it to keep running to do so gracefully. Read from Done() to
|
|
// be notified when the job completes.
|
|
func (r *Runner) Cancel() {
|
|
r.cancel()
|
|
}
|
|
|
|
func (r *Runner) Done() <-chan struct{} {
|
|
return r.done
|
|
}
|
|
|
|
// Fail immediately halts updates and, if the job is not complete sends FailJob to the coder server. Running goroutines
|
|
// are canceled but complete asynchronously (although they are prevented from further updating the job to the coder
|
|
// server). The provided context sets how long to keep trying to send the FailJob.
|
|
func (r *Runner) Fail(ctx context.Context, f *proto.FailedJob) error {
|
|
f.JobId = r.job.JobId
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
if !r.okToSend {
|
|
return nil // already done
|
|
}
|
|
r.Cancel()
|
|
if r.failedJob == nil {
|
|
r.failedJob = f
|
|
r.cond.Signal()
|
|
}
|
|
// here we keep the original failed reason if there already was one, but we hadn't yet sent it. It is likely more
|
|
// informative of the job failing due to some problem running it, whereas this function is used to externally
|
|
// force a Fail.
|
|
err := r.sender.FailJob(ctx, r.failedJob)
|
|
r.okToSend = false
|
|
r.stop()
|
|
close(r.done)
|
|
return err
|
|
}
|
|
|
|
// setComplete is an internal function to set the job to completed. This does not send the completedJob.
|
|
func (r *Runner) setComplete(c *proto.CompletedJob) {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
if r.completedJob == nil {
|
|
r.completedJob = c
|
|
r.cond.Signal()
|
|
}
|
|
}
|
|
|
|
// setFail is an internal function to set the job to failed. This does not send the failedJob.
|
|
func (r *Runner) setFail(f *proto.FailedJob) {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
if r.failedJob == nil {
|
|
f.JobId = r.job.JobId
|
|
r.failedJob = f
|
|
r.cond.Signal()
|
|
}
|
|
}
|
|
|
|
// ForceStop signals all goroutines to stop and prevents any further API calls back to coder server for this job
|
|
func (r *Runner) ForceStop() {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
r.stop()
|
|
if !r.okToSend {
|
|
return
|
|
}
|
|
r.okToSend = false
|
|
close(r.done)
|
|
// doesn't matter what we put here, since it won't get sent! Just need something to satisfy the condition in
|
|
// Start()
|
|
r.failedJob = &proto.FailedJob{}
|
|
r.cond.Signal()
|
|
}
|
|
|
|
func (r *Runner) sendHeartbeat(ctx context.Context) (*proto.UpdateJobResponse, error) {
|
|
ctx, span := r.startTrace(ctx, "updateHeartbeat")
|
|
defer span.End()
|
|
|
|
r.mutex.Lock()
|
|
if !r.okToSend {
|
|
r.mutex.Unlock()
|
|
return nil, errUpdateSkipped
|
|
}
|
|
r.mutex.Unlock()
|
|
|
|
// Skip sending a heartbeat if we've sent an update recently.
|
|
if lastUpdate := r.lastUpdate.Load(); lastUpdate != nil {
|
|
if time.Since(*lastUpdate) < r.updateInterval {
|
|
span.SetAttributes(attribute.Bool("heartbeat_skipped", true))
|
|
return &proto.UpdateJobResponse{}, nil
|
|
}
|
|
}
|
|
|
|
return r.update(ctx, &proto.UpdateJobRequest{
|
|
JobId: r.job.JobId,
|
|
})
|
|
}
|
|
|
|
func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
ctx, span := r.startTrace(ctx, tracing.FuncName())
|
|
defer span.End()
|
|
defer func() {
|
|
r.lastUpdate.Store(ptr.Ref(time.Now()))
|
|
}()
|
|
|
|
span.SetAttributes(
|
|
attribute.Int64("logs_len", int64(len(u.Logs))),
|
|
attribute.Int64("template_variables_len", int64(len(u.TemplateVariables))),
|
|
attribute.Int64("user_variable_values_len", int64(len(u.UserVariableValues))),
|
|
attribute.Int64("readme_len", int64(len(u.Readme))),
|
|
)
|
|
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
if !r.okToSend {
|
|
return nil, errUpdateSkipped
|
|
}
|
|
|
|
return r.sender.UpdateJob(ctx, u)
|
|
}
|
|
|
|
// doCleanFinish wraps a call to do() with cleaning up the job and setting the terminal messages
|
|
func (r *Runner) doCleanFinish(ctx context.Context) {
|
|
var (
|
|
failedJob *proto.FailedJob
|
|
completedJob *proto.CompletedJob
|
|
)
|
|
|
|
// push the fail/succeed write onto the defer stack before the cleanup, so
|
|
// that cleanup happens before this.
|
|
defer func() {
|
|
_, span := r.startTrace(ctx, tracing.FuncName())
|
|
defer span.End()
|
|
|
|
if failedJob != nil {
|
|
r.setFail(failedJob)
|
|
return
|
|
}
|
|
r.setComplete(completedJob)
|
|
}()
|
|
|
|
var err error
|
|
r.session, err = r.provisioner.Session(ctx)
|
|
if err != nil {
|
|
failedJob = r.failedJobf("open session: %s", err)
|
|
return
|
|
}
|
|
defer r.session.Close()
|
|
|
|
defer func() {
|
|
ctx, span := r.startTrace(ctx, tracing.FuncName())
|
|
defer span.End()
|
|
|
|
r.queueLog(ctx, &proto.Log{
|
|
Source: proto.LogSource_PROVISIONER_DAEMON,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
Stage: "Cleaning Up",
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
})
|
|
r.flushQueuedLogs(ctx)
|
|
}()
|
|
|
|
completedJob, failedJob = r.do(ctx)
|
|
}
|
|
|
|
// do actually does the work of running the job
|
|
func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) {
|
|
ctx, span := r.startTrace(ctx, tracing.FuncName())
|
|
defer span.End()
|
|
|
|
r.queueLog(ctx, &proto.Log{
|
|
Source: proto.LogSource_PROVISIONER_DAEMON,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
Stage: "Setting up",
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
})
|
|
|
|
switch jobType := r.job.Type.(type) {
|
|
case *proto.AcquiredJob_TemplateImport_:
|
|
r.logger.Debug(context.Background(), "acquired job is template import",
|
|
slog.F("user_variable_values", redactVariableValues(jobType.TemplateImport.UserVariableValues)),
|
|
)
|
|
|
|
return r.runTemplateImport(ctx)
|
|
case *proto.AcquiredJob_TemplateDryRun_:
|
|
r.logger.Debug(context.Background(), "acquired job is template dry-run",
|
|
slog.F("workspace_name", jobType.TemplateDryRun.Metadata.WorkspaceName),
|
|
slog.F("rich_parameter_values", jobType.TemplateDryRun.RichParameterValues),
|
|
slog.F("variable_values", redactVariableValues(jobType.TemplateDryRun.VariableValues)),
|
|
)
|
|
return r.runTemplateDryRun(ctx)
|
|
case *proto.AcquiredJob_WorkspaceBuild_:
|
|
r.logger.Debug(context.Background(), "acquired job is workspace provision",
|
|
slog.F("workspace_name", jobType.WorkspaceBuild.WorkspaceName),
|
|
slog.F("state_length", len(jobType.WorkspaceBuild.State)),
|
|
slog.F("rich_parameter_values", jobType.WorkspaceBuild.RichParameterValues),
|
|
slog.F("variable_values", redactVariableValues(jobType.WorkspaceBuild.VariableValues)),
|
|
)
|
|
return r.runWorkspaceBuild(ctx)
|
|
default:
|
|
return nil, r.failedJobf("unknown job type %q; ensure your provisioner daemon is up-to-date",
|
|
reflect.TypeOf(r.job.Type).String())
|
|
}
|
|
}
|
|
|
|
func (r *Runner) configure(config *sdkproto.Config) *proto.FailedJob {
|
|
err := r.session.Send(&sdkproto.Request{Type: &sdkproto.Request_Config{Config: config}})
|
|
if err != nil {
|
|
return r.failedJobf("send config: %s", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// heartbeatRoutine periodically sends updates on the job, which keeps coder server
|
|
// from assuming the job is stalled, and allows the runner to learn if the job
|
|
// has been canceled by the user.
|
|
func (r *Runner) heartbeatRoutine(ctx context.Context) {
|
|
ctx, span := r.startTrace(ctx, tracing.FuncName())
|
|
defer span.End()
|
|
|
|
ticker := time.NewTicker(r.updateInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-r.notCanceled.Done():
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
|
|
resp, err := r.sendHeartbeat(ctx)
|
|
if err != nil {
|
|
// Calling Fail starts cancellation so the process will exit.
|
|
err = r.Fail(ctx, r.failedJobf("send periodic update: %s", err))
|
|
if err != nil {
|
|
r.logger.Error(ctx, "failed to call FailJob", slog.Error(err))
|
|
}
|
|
return
|
|
}
|
|
if !resp.Canceled {
|
|
ticker.Reset(r.updateInterval)
|
|
continue
|
|
}
|
|
r.logger.Info(ctx, "attempting graceful cancellation")
|
|
r.Cancel()
|
|
// Mark the job as failed after a minute of pending cancellation.
|
|
timer := time.NewTimer(r.forceCancelInterval)
|
|
select {
|
|
case <-timer.C:
|
|
r.logger.Debug(ctx, "cancel timed out")
|
|
err := r.Fail(ctx, r.failedJobf("Cancel timed out"))
|
|
if err != nil {
|
|
r.logger.Warn(ctx, "failed to call FailJob", slog.Error(err))
|
|
}
|
|
return
|
|
case <-r.Done():
|
|
timer.Stop()
|
|
return
|
|
case <-r.notStopped.Done():
|
|
timer.Stop()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Runner) runTemplateImport(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) {
|
|
ctx, span := r.startTrace(ctx, tracing.FuncName())
|
|
defer span.End()
|
|
|
|
failedJob := r.configure(&sdkproto.Config{
|
|
TemplateSourceArchive: r.job.GetTemplateSourceArchive(),
|
|
})
|
|
if failedJob != nil {
|
|
return nil, failedJob
|
|
}
|
|
|
|
// Parse parameters and update the job with the parameter specs
|
|
r.queueLog(ctx, &proto.Log{
|
|
Source: proto.LogSource_PROVISIONER_DAEMON,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
Stage: "Parsing template parameters",
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
})
|
|
workspaceTags, templateVariables, readme, err := r.runTemplateImportParse(ctx) // TODO workspace_tags
|
|
if err != nil {
|
|
return nil, r.failedJobf("run parse: %s", err)
|
|
}
|
|
|
|
// Once Terraform template variables are parsed, the runner can pass variables
|
|
// to store in database and filter valid ones.
|
|
updateResponse, err := r.update(ctx, &proto.UpdateJobRequest{
|
|
JobId: r.job.JobId,
|
|
TemplateVariables: templateVariables,
|
|
UserVariableValues: r.job.GetTemplateImport().GetUserVariableValues(),
|
|
Readme: readme,
|
|
WorkspaceTags: workspaceTags,
|
|
})
|
|
if err != nil {
|
|
return nil, r.failedJobf("update job: %s", err)
|
|
}
|
|
|
|
// Determine persistent resources
|
|
r.queueLog(ctx, &proto.Log{
|
|
Source: proto.LogSource_PROVISIONER_DAEMON,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
Stage: "Detecting persistent resources",
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
})
|
|
startProvision, err := r.runTemplateImportProvision(ctx, updateResponse.VariableValues, &sdkproto.Metadata{
|
|
CoderUrl: r.job.GetTemplateImport().Metadata.CoderUrl,
|
|
WorkspaceTransition: sdkproto.WorkspaceTransition_START,
|
|
})
|
|
if err != nil {
|
|
return nil, r.failedJobf("template import provision for start: %s", err)
|
|
}
|
|
|
|
// Determine ephemeral resources.
|
|
r.queueLog(ctx, &proto.Log{
|
|
Source: proto.LogSource_PROVISIONER_DAEMON,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
Stage: "Detecting ephemeral resources",
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
})
|
|
stopProvision, err := r.runTemplateImportProvision(ctx, updateResponse.VariableValues, &sdkproto.Metadata{
|
|
CoderUrl: r.job.GetTemplateImport().Metadata.CoderUrl,
|
|
WorkspaceTransition: sdkproto.WorkspaceTransition_STOP,
|
|
})
|
|
if err != nil {
|
|
return nil, r.failedJobf("template import provision for stop: %s", err)
|
|
}
|
|
|
|
// For backwards compatibility with older versions of coderd
|
|
externalAuthProviderNames := make([]string, 0, len(startProvision.ExternalAuthProviders))
|
|
for _, it := range startProvision.ExternalAuthProviders {
|
|
externalAuthProviderNames = append(externalAuthProviderNames, it.Id)
|
|
}
|
|
|
|
return &proto.CompletedJob{
|
|
JobId: r.job.JobId,
|
|
Type: &proto.CompletedJob_TemplateImport_{
|
|
TemplateImport: &proto.CompletedJob_TemplateImport{
|
|
StartResources: startProvision.Resources,
|
|
StopResources: stopProvision.Resources,
|
|
RichParameters: startProvision.Parameters,
|
|
ExternalAuthProvidersNames: externalAuthProviderNames,
|
|
ExternalAuthProviders: startProvision.ExternalAuthProviders,
|
|
StartModules: startProvision.Modules,
|
|
StopModules: stopProvision.Modules,
|
|
Presets: startProvision.Presets,
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// Parses template variables and README from source.
|
|
func (r *Runner) runTemplateImportParse(ctx context.Context) (
|
|
workspaceTags map[string]string, vars []*sdkproto.TemplateVariable, readme []byte, err error,
|
|
) {
|
|
ctx, span := r.startTrace(ctx, tracing.FuncName())
|
|
defer span.End()
|
|
|
|
err = r.session.Send(&sdkproto.Request{Type: &sdkproto.Request_Parse{Parse: &sdkproto.ParseRequest{}}})
|
|
if err != nil {
|
|
return nil, nil, nil, xerrors.Errorf("parse source: %w", err)
|
|
}
|
|
for {
|
|
msg, err := r.session.Recv()
|
|
if err != nil {
|
|
return nil, nil, nil, xerrors.Errorf("recv parse source: %w", err)
|
|
}
|
|
switch msgType := msg.Type.(type) {
|
|
case *sdkproto.Response_Log:
|
|
r.logProvisionerJobLog(context.Background(), msgType.Log.Level, "parse job logged",
|
|
slog.F("level", msgType.Log.Level),
|
|
slog.F("output", msgType.Log.Output),
|
|
)
|
|
|
|
r.queueLog(ctx, &proto.Log{
|
|
Source: proto.LogSource_PROVISIONER,
|
|
Level: msgType.Log.Level,
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
Output: msgType.Log.Output,
|
|
Stage: "Parse parameters",
|
|
})
|
|
case *sdkproto.Response_Parse:
|
|
pc := msgType.Parse
|
|
r.logger.Debug(context.Background(), "parse complete",
|
|
slog.F("workspace_tags", pc.WorkspaceTags),
|
|
slog.F("template_variables", pc.TemplateVariables),
|
|
slog.F("readme_len", len(pc.Readme)),
|
|
slog.F("error", pc.Error),
|
|
)
|
|
if pc.Error != "" {
|
|
return nil, nil, nil, xerrors.Errorf("parse error: %s", pc.Error)
|
|
}
|
|
|
|
return msgType.Parse.WorkspaceTags, msgType.Parse.TemplateVariables, msgType.Parse.Readme, nil
|
|
default:
|
|
return nil, nil, nil, xerrors.Errorf("invalid message type %q received from provisioner",
|
|
reflect.TypeOf(msg.Type).String())
|
|
}
|
|
}
|
|
}
|
|
|
|
type templateImportProvision struct {
|
|
Resources []*sdkproto.Resource
|
|
Parameters []*sdkproto.RichParameter
|
|
ExternalAuthProviders []*sdkproto.ExternalAuthProviderResource
|
|
Modules []*sdkproto.Module
|
|
Presets []*sdkproto.Preset
|
|
}
|
|
|
|
// Performs a dry-run provision when importing a template.
|
|
// This is used to detect resources that would be provisioned for a workspace in various states.
|
|
// It doesn't define values for rich parameters as they're unknown during template import.
|
|
func (r *Runner) runTemplateImportProvision(ctx context.Context, variableValues []*sdkproto.VariableValue, metadata *sdkproto.Metadata) (*templateImportProvision, error) {
|
|
return r.runTemplateImportProvisionWithRichParameters(ctx, variableValues, nil, metadata)
|
|
}
|
|
|
|
// Performs a dry-run provision with provided rich parameters.
|
|
// This is used to detect resources that would be provisioned for a workspace in various states.
|
|
func (r *Runner) runTemplateImportProvisionWithRichParameters(
|
|
ctx context.Context,
|
|
variableValues []*sdkproto.VariableValue,
|
|
richParameterValues []*sdkproto.RichParameterValue,
|
|
metadata *sdkproto.Metadata,
|
|
) (*templateImportProvision, error) {
|
|
ctx, span := r.startTrace(ctx, tracing.FuncName())
|
|
defer span.End()
|
|
|
|
var stage string
|
|
switch metadata.WorkspaceTransition {
|
|
case sdkproto.WorkspaceTransition_START:
|
|
stage = "Detecting persistent resources"
|
|
case sdkproto.WorkspaceTransition_STOP:
|
|
stage = "Detecting ephemeral resources"
|
|
}
|
|
// use the notStopped so that if we attempt to gracefully cancel, the stream will still be available for us
|
|
// to send the cancel to the provisioner
|
|
err := r.session.Send(&sdkproto.Request{Type: &sdkproto.Request_Plan{Plan: &sdkproto.PlanRequest{
|
|
Metadata: metadata,
|
|
RichParameterValues: richParameterValues,
|
|
VariableValues: variableValues,
|
|
}}})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("start provision: %w", err)
|
|
}
|
|
nevermind := make(chan struct{})
|
|
defer close(nevermind)
|
|
go func() {
|
|
select {
|
|
case <-nevermind:
|
|
return
|
|
case <-r.notStopped.Done():
|
|
return
|
|
case <-r.notCanceled.Done():
|
|
_ = r.session.Send(&sdkproto.Request{
|
|
Type: &sdkproto.Request_Cancel{
|
|
Cancel: &sdkproto.CancelRequest{},
|
|
},
|
|
})
|
|
}
|
|
}()
|
|
|
|
for {
|
|
msg, err := r.session.Recv()
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("recv import provision: %w", err)
|
|
}
|
|
switch msgType := msg.Type.(type) {
|
|
case *sdkproto.Response_Log:
|
|
r.logProvisionerJobLog(context.Background(), msgType.Log.Level, "template import provision job logged",
|
|
slog.F("level", msgType.Log.Level),
|
|
slog.F("output", msgType.Log.Output),
|
|
)
|
|
r.queueLog(ctx, &proto.Log{
|
|
Source: proto.LogSource_PROVISIONER,
|
|
Level: msgType.Log.Level,
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
Output: msgType.Log.Output,
|
|
Stage: stage,
|
|
})
|
|
case *sdkproto.Response_Plan:
|
|
c := msgType.Plan
|
|
if c.Error != "" {
|
|
r.logger.Info(context.Background(), "dry-run provision failure",
|
|
slog.F("error", c.Error),
|
|
)
|
|
|
|
return nil, xerrors.New(c.Error)
|
|
}
|
|
|
|
r.logger.Info(context.Background(), "parse dry-run provision successful",
|
|
slog.F("resource_count", len(c.Resources)),
|
|
slog.F("resources", resourceNames(c.Resources)),
|
|
)
|
|
|
|
return &templateImportProvision{
|
|
Resources: c.Resources,
|
|
Parameters: c.Parameters,
|
|
ExternalAuthProviders: c.ExternalAuthProviders,
|
|
Modules: c.Modules,
|
|
Presets: c.Presets,
|
|
}, nil
|
|
default:
|
|
return nil, xerrors.Errorf("invalid message type %q received from provisioner",
|
|
reflect.TypeOf(msg.Type).String())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Runner) runTemplateDryRun(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) {
|
|
ctx, span := r.startTrace(ctx, tracing.FuncName())
|
|
defer span.End()
|
|
|
|
// Ensure all metadata fields are set as they are all optional for dry-run.
|
|
metadata := r.job.GetTemplateDryRun().GetMetadata()
|
|
metadata.WorkspaceTransition = sdkproto.WorkspaceTransition_START
|
|
if metadata.CoderUrl == "" {
|
|
metadata.CoderUrl = "http://localhost:3000"
|
|
}
|
|
if metadata.WorkspaceName == "" {
|
|
metadata.WorkspaceName = "dryrun"
|
|
}
|
|
metadata.WorkspaceOwner = r.job.UserName
|
|
if metadata.WorkspaceOwner == "" {
|
|
metadata.WorkspaceOwner = "dryrunner"
|
|
}
|
|
if metadata.WorkspaceId == "" {
|
|
id, err := uuid.NewRandom()
|
|
if err != nil {
|
|
return nil, r.failedJobf("generate random ID: %s", err)
|
|
}
|
|
metadata.WorkspaceId = id.String()
|
|
}
|
|
if metadata.WorkspaceOwnerId == "" {
|
|
id, err := uuid.NewRandom()
|
|
if err != nil {
|
|
return nil, r.failedJobf("generate random ID: %s", err)
|
|
}
|
|
metadata.WorkspaceOwnerId = id.String()
|
|
}
|
|
|
|
failedJob := r.configure(&sdkproto.Config{
|
|
TemplateSourceArchive: r.job.GetTemplateSourceArchive(),
|
|
})
|
|
if failedJob != nil {
|
|
return nil, failedJob
|
|
}
|
|
|
|
// Run the template import provision task since it's already a dry run.
|
|
provision, err := r.runTemplateImportProvisionWithRichParameters(ctx,
|
|
r.job.GetTemplateDryRun().GetVariableValues(),
|
|
r.job.GetTemplateDryRun().GetRichParameterValues(),
|
|
metadata,
|
|
)
|
|
if err != nil {
|
|
return nil, r.failedJobf("run dry-run provision job: %s", err)
|
|
}
|
|
|
|
return &proto.CompletedJob{
|
|
JobId: r.job.JobId,
|
|
Type: &proto.CompletedJob_TemplateDryRun_{
|
|
TemplateDryRun: &proto.CompletedJob_TemplateDryRun{
|
|
Resources: provision.Resources,
|
|
Modules: provision.Modules,
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (r *Runner) buildWorkspace(ctx context.Context, stage string, req *sdkproto.Request) (
|
|
*sdkproto.Response, *proto.FailedJob,
|
|
) {
|
|
// use the notStopped so that if we attempt to gracefully cancel, the stream
|
|
// will still be available for us to send the cancel to the provisioner
|
|
err := r.session.Send(req)
|
|
if err != nil {
|
|
return nil, r.failedWorkspaceBuildf("start provision: %s", err)
|
|
}
|
|
nevermind := make(chan struct{})
|
|
defer close(nevermind)
|
|
go func() {
|
|
select {
|
|
case <-nevermind:
|
|
return
|
|
case <-r.notStopped.Done():
|
|
return
|
|
case <-r.notCanceled.Done():
|
|
_ = r.session.Send(&sdkproto.Request{
|
|
Type: &sdkproto.Request_Cancel{
|
|
Cancel: &sdkproto.CancelRequest{},
|
|
},
|
|
})
|
|
}
|
|
}()
|
|
|
|
for {
|
|
msg, err := r.session.Recv()
|
|
if err != nil {
|
|
return nil, r.failedWorkspaceBuildf("recv workspace provision: %s", err)
|
|
}
|
|
switch msgType := msg.Type.(type) {
|
|
case *sdkproto.Response_Log:
|
|
r.logProvisionerJobLog(context.Background(), msgType.Log.Level, "workspace provisioner job logged",
|
|
slog.F("level", msgType.Log.Level),
|
|
slog.F("output", msgType.Log.Output),
|
|
slog.F("workspace_build_id", r.job.GetWorkspaceBuild().WorkspaceBuildId),
|
|
)
|
|
|
|
r.queueLog(ctx, &proto.Log{
|
|
Source: proto.LogSource_PROVISIONER,
|
|
Level: msgType.Log.Level,
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
Output: msgType.Log.Output,
|
|
Stage: stage,
|
|
})
|
|
default:
|
|
// Stop looping!
|
|
return msg, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Runner) commitQuota(ctx context.Context, resources []*sdkproto.Resource) *proto.FailedJob {
|
|
cost := sumDailyCost(resources)
|
|
r.logger.Debug(ctx, "committing quota",
|
|
slog.F("resources", resourceNames(resources)),
|
|
slog.F("cost", cost),
|
|
)
|
|
if cost == 0 {
|
|
return nil
|
|
}
|
|
|
|
const stage = "Commit quota"
|
|
|
|
resp, err := r.quotaCommitter.CommitQuota(ctx, &proto.CommitQuotaRequest{
|
|
JobId: r.job.JobId,
|
|
DailyCost: int32(cost),
|
|
})
|
|
if err != nil {
|
|
r.queueLog(ctx, &proto.Log{
|
|
Source: proto.LogSource_PROVISIONER,
|
|
Level: sdkproto.LogLevel_ERROR,
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
Output: fmt.Sprintf("Failed to commit quota: %+v", err),
|
|
Stage: stage,
|
|
})
|
|
return r.failedJobf("commit quota: %+v", err)
|
|
}
|
|
for _, line := range []string{
|
|
fmt.Sprintf("Build cost — %v", cost),
|
|
fmt.Sprintf("Budget — %v", resp.Budget),
|
|
fmt.Sprintf("Credits consumed — %v", resp.CreditsConsumed),
|
|
} {
|
|
r.queueLog(ctx, &proto.Log{
|
|
Source: proto.LogSource_PROVISIONER,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
Output: line,
|
|
Stage: stage,
|
|
})
|
|
}
|
|
|
|
if !resp.Ok {
|
|
r.queueLog(ctx, &proto.Log{
|
|
Source: proto.LogSource_PROVISIONER,
|
|
Level: sdkproto.LogLevel_WARN,
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
Output: "This build would exceed your quota. Failing.",
|
|
Stage: stage,
|
|
})
|
|
return r.failedWorkspaceBuildf("insufficient quota")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *Runner) runWorkspaceBuild(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) {
|
|
ctx, span := r.startTrace(ctx, tracing.FuncName())
|
|
defer span.End()
|
|
|
|
var (
|
|
applyStage string
|
|
commitQuota bool
|
|
)
|
|
switch r.job.GetWorkspaceBuild().Metadata.WorkspaceTransition {
|
|
case sdkproto.WorkspaceTransition_START:
|
|
applyStage = "Starting workspace"
|
|
commitQuota = true
|
|
case sdkproto.WorkspaceTransition_STOP:
|
|
applyStage = "Stopping workspace"
|
|
commitQuota = true
|
|
case sdkproto.WorkspaceTransition_DESTROY:
|
|
applyStage = "Destroying workspace"
|
|
}
|
|
|
|
failedJob := r.configure(&sdkproto.Config{
|
|
TemplateSourceArchive: r.job.GetTemplateSourceArchive(),
|
|
State: r.job.GetWorkspaceBuild().State,
|
|
ProvisionerLogLevel: r.job.GetWorkspaceBuild().LogLevel,
|
|
})
|
|
if failedJob != nil {
|
|
return nil, failedJob
|
|
}
|
|
|
|
resp, failed := r.buildWorkspace(ctx, "Planning infrastructure", &sdkproto.Request{
|
|
Type: &sdkproto.Request_Plan{
|
|
Plan: &sdkproto.PlanRequest{
|
|
Metadata: r.job.GetWorkspaceBuild().Metadata,
|
|
RichParameterValues: r.job.GetWorkspaceBuild().RichParameterValues,
|
|
VariableValues: r.job.GetWorkspaceBuild().VariableValues,
|
|
ExternalAuthProviders: r.job.GetWorkspaceBuild().ExternalAuthProviders,
|
|
},
|
|
},
|
|
})
|
|
if failed != nil {
|
|
return nil, failed
|
|
}
|
|
planComplete := resp.GetPlan()
|
|
if planComplete == nil {
|
|
return nil, r.failedWorkspaceBuildf("invalid message type %T received from provisioner", resp.Type)
|
|
}
|
|
if planComplete.Error != "" {
|
|
r.logger.Warn(context.Background(), "plan request failed",
|
|
slog.F("error", planComplete.Error),
|
|
)
|
|
|
|
return nil, &proto.FailedJob{
|
|
JobId: r.job.JobId,
|
|
Error: planComplete.Error,
|
|
Type: &proto.FailedJob_WorkspaceBuild_{
|
|
WorkspaceBuild: &proto.FailedJob_WorkspaceBuild{},
|
|
},
|
|
}
|
|
}
|
|
|
|
r.logger.Info(context.Background(), "plan request successful",
|
|
slog.F("resource_count", len(planComplete.Resources)),
|
|
slog.F("resources", resourceNames(planComplete.Resources)),
|
|
)
|
|
r.flushQueuedLogs(ctx)
|
|
if commitQuota {
|
|
failed = r.commitQuota(ctx, planComplete.Resources)
|
|
r.flushQueuedLogs(ctx)
|
|
if failed != nil {
|
|
return nil, failed
|
|
}
|
|
}
|
|
|
|
r.queueLog(ctx, &proto.Log{
|
|
Source: proto.LogSource_PROVISIONER_DAEMON,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
Stage: applyStage,
|
|
CreatedAt: time.Now().UnixMilli(),
|
|
})
|
|
|
|
resp, failed = r.buildWorkspace(ctx, applyStage, &sdkproto.Request{
|
|
Type: &sdkproto.Request_Apply{
|
|
Apply: &sdkproto.ApplyRequest{
|
|
Metadata: r.job.GetWorkspaceBuild().Metadata,
|
|
},
|
|
},
|
|
})
|
|
if failed != nil {
|
|
return nil, failed
|
|
}
|
|
applyComplete := resp.GetApply()
|
|
if applyComplete == nil {
|
|
return nil, r.failedWorkspaceBuildf("invalid message type %T received from provisioner", resp.Type)
|
|
}
|
|
|
|
// Prepend the plan timings (since they occurred first).
|
|
applyComplete.Timings = append(planComplete.Timings, applyComplete.Timings...)
|
|
|
|
if applyComplete.Error != "" {
|
|
r.logger.Warn(context.Background(), "apply failed; updating state",
|
|
slog.F("error", applyComplete.Error),
|
|
slog.F("state_len", len(applyComplete.State)),
|
|
)
|
|
|
|
return nil, &proto.FailedJob{
|
|
JobId: r.job.JobId,
|
|
Error: applyComplete.Error,
|
|
Type: &proto.FailedJob_WorkspaceBuild_{
|
|
WorkspaceBuild: &proto.FailedJob_WorkspaceBuild{
|
|
State: applyComplete.State,
|
|
Timings: applyComplete.Timings,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
r.logger.Info(context.Background(), "apply successful",
|
|
slog.F("resource_count", len(applyComplete.Resources)),
|
|
slog.F("resources", resourceNames(applyComplete.Resources)),
|
|
slog.F("state_len", len(applyComplete.State)),
|
|
)
|
|
r.flushQueuedLogs(ctx)
|
|
|
|
return &proto.CompletedJob{
|
|
JobId: r.job.JobId,
|
|
Type: &proto.CompletedJob_WorkspaceBuild_{
|
|
WorkspaceBuild: &proto.CompletedJob_WorkspaceBuild{
|
|
State: applyComplete.State,
|
|
Resources: applyComplete.Resources,
|
|
Timings: applyComplete.Timings,
|
|
// Modules are created on disk by `terraform init`, and that is only
|
|
// called by `plan`. `apply` does not modify them, so we can use the
|
|
// modules from the plan response.
|
|
Modules: planComplete.Modules,
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func resourceNames(rs []*sdkproto.Resource) []string {
|
|
var sb strings.Builder
|
|
names := make([]string, 0, len(rs))
|
|
for _, r := range rs {
|
|
_, _ = sb.WriteString(r.Type)
|
|
_, _ = sb.WriteString(".")
|
|
_, _ = sb.WriteString(r.Name)
|
|
names = append(names, sb.String())
|
|
sb.Reset()
|
|
}
|
|
return names
|
|
}
|
|
|
|
func (r *Runner) failedWorkspaceBuildf(format string, args ...interface{}) *proto.FailedJob {
|
|
failedJob := r.failedJobf(format, args...)
|
|
failedJob.Type = &proto.FailedJob_WorkspaceBuild_{}
|
|
return failedJob
|
|
}
|
|
|
|
func (r *Runner) failedJobf(format string, args ...interface{}) *proto.FailedJob {
|
|
message := fmt.Sprintf(format, args...)
|
|
var code string
|
|
|
|
for c, m := range errorCodes {
|
|
if strings.Contains(message, m) {
|
|
code = c
|
|
break
|
|
}
|
|
}
|
|
return &proto.FailedJob{
|
|
JobId: r.job.JobId,
|
|
Error: message,
|
|
ErrorCode: code,
|
|
}
|
|
}
|
|
|
|
func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
|
|
return r.tracer.Start(ctx, name, append(opts, trace.WithAttributes(
|
|
semconv.ServiceNameKey.String("coderd.provisionerd"),
|
|
attribute.String("job_id", r.job.JobId),
|
|
))...)
|
|
}
|
|
|
|
// queueLog adds a log to the buffer and debounces a timer
|
|
// if one exists to flush the logs. It stores a maximum of
|
|
// 100 log lines before flushing as a safe-guard mechanism.
|
|
func (r *Runner) queueLog(ctx context.Context, log *proto.Log) {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
r.queuedLogs = append(r.queuedLogs, log)
|
|
if r.flushLogsTimer != nil {
|
|
r.flushLogsTimer.Reset(r.logBufferInterval)
|
|
return
|
|
}
|
|
// This can be configurable if there are a ton of logs.
|
|
if len(r.queuedLogs) > 100 {
|
|
// Flushing logs requires a lock, so this can happen async.
|
|
go r.flushQueuedLogs(ctx)
|
|
return
|
|
}
|
|
r.flushLogsTimer = time.AfterFunc(r.logBufferInterval, func() {
|
|
r.flushQueuedLogs(ctx)
|
|
})
|
|
}
|
|
|
|
func (r *Runner) flushQueuedLogs(ctx context.Context) {
|
|
r.mutex.Lock()
|
|
if r.flushLogsTimer != nil {
|
|
r.flushLogsTimer.Stop()
|
|
}
|
|
logs := r.queuedLogs
|
|
r.queuedLogs = make([]*proto.Log, 0)
|
|
r.mutex.Unlock()
|
|
_, err := r.update(ctx, &proto.UpdateJobRequest{
|
|
JobId: r.job.JobId,
|
|
Logs: logs,
|
|
})
|
|
if err != nil {
|
|
if errors.Is(err, errUpdateSkipped) {
|
|
return
|
|
}
|
|
r.logger.Error(ctx, "flush queued logs", slog.Error(err))
|
|
}
|
|
}
|
|
|
|
func redactVariableValues(variableValues []*sdkproto.VariableValue) []*sdkproto.VariableValue {
|
|
var redacted []*sdkproto.VariableValue
|
|
for _, v := range variableValues {
|
|
if v.Sensitive {
|
|
redacted = append(redacted, &sdkproto.VariableValue{
|
|
Name: v.Name,
|
|
Value: "*redacted*",
|
|
Sensitive: true,
|
|
})
|
|
continue
|
|
}
|
|
redacted = append(redacted, v)
|
|
}
|
|
return redacted
|
|
}
|
|
|
|
// logProvisionerJobLog logs a message from the provisioner daemon at the appropriate level.
|
|
func (r *Runner) logProvisionerJobLog(ctx context.Context, logLevel sdkproto.LogLevel, msg string, fields ...any) {
|
|
switch logLevel {
|
|
case sdkproto.LogLevel_TRACE:
|
|
r.logger.Debug(ctx, msg, fields...) // There's no trace, so we'll just use debug.
|
|
case sdkproto.LogLevel_DEBUG:
|
|
r.logger.Debug(ctx, msg, fields...)
|
|
case sdkproto.LogLevel_INFO:
|
|
r.logger.Info(ctx, msg, fields...)
|
|
case sdkproto.LogLevel_WARN:
|
|
r.logger.Warn(ctx, msg, fields...)
|
|
case sdkproto.LogLevel_ERROR:
|
|
r.logger.Error(ctx, msg, fields...)
|
|
default: // should never happen, but we should not explode either.
|
|
r.logger.Info(ctx, msg, fields...)
|
|
}
|
|
}
|