fix(coderd): add blocking GetProvisionerJobByIDWithLock for workspace build cancellation (#19737)

Closes https://github.com/coder/internal/issues/885

Adds a new database method GetProvisionerJobByIDWithLock that uses FOR
UPDATE without SKIP LOCKED to fix workspace build cancellation returning
500 errors when jobs are locked.
This commit is contained in:
Kacper Sawicki
2025-09-08 15:40:14 +02:00
committed by GitHub
parent 065c7c3d5d
commit 776231d025
9 changed files with 109 additions and 2 deletions
+12
View File
@@ -2610,6 +2610,18 @@ func (q *querier) GetProvisionerJobByIDForUpdate(ctx context.Context, id uuid.UU
return job, nil
}
func (q *querier) GetProvisionerJobByIDWithLock(ctx context.Context, id uuid.UUID) (database.ProvisionerJob, error) {
job, err := q.db.GetProvisionerJobByIDWithLock(ctx, id)
if err != nil {
return database.ProvisionerJob{}, err
}
if err := q.authorizeProvisionerJob(ctx, job); err != nil {
return database.ProvisionerJob{}, err
}
return job, nil
}
func (q *querier) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]database.ProvisionerJobTiming, error) {
_, err := q.GetProvisionerJobByID(ctx, jobID)
if err != nil {
+18
View File
@@ -664,6 +664,24 @@ func (s *MethodTestSuite) TestProvisionerJob() {
dbm.EXPECT().GetProvisionerLogsAfterID(gomock.Any(), arg).Return([]database.ProvisionerJobLog{}, nil).AnyTimes()
check.Args(arg).Asserts(ws, policy.ActionRead).Returns([]database.ProvisionerJobLog{})
}))
s.Run("Build/GetProvisionerJobByIDWithLock", s.Mocked(func(dbm *dbmock.MockStore, faker *gofakeit.Faker, check *expects) {
ws := testutil.Fake(s.T(), faker, database.Workspace{})
j := testutil.Fake(s.T(), faker, database.ProvisionerJob{Type: database.ProvisionerJobTypeWorkspaceBuild})
build := testutil.Fake(s.T(), faker, database.WorkspaceBuild{WorkspaceID: ws.ID, JobID: j.ID})
dbm.EXPECT().GetProvisionerJobByIDWithLock(gomock.Any(), j.ID).Return(j, nil).AnyTimes()
dbm.EXPECT().GetWorkspaceBuildByJobID(gomock.Any(), j.ID).Return(build, nil).AnyTimes()
dbm.EXPECT().GetWorkspaceByID(gomock.Any(), build.WorkspaceID).Return(ws, nil).AnyTimes()
check.Args(j.ID).Asserts(ws, policy.ActionRead).Returns(j)
}))
s.Run("TemplateVersion/GetProvisionerJobByIDWithLock", s.Mocked(func(dbm *dbmock.MockStore, faker *gofakeit.Faker, check *expects) {
tpl := testutil.Fake(s.T(), faker, database.Template{})
j := testutil.Fake(s.T(), faker, database.ProvisionerJob{Type: database.ProvisionerJobTypeTemplateVersionImport})
v := testutil.Fake(s.T(), faker, database.TemplateVersion{JobID: j.ID, TemplateID: uuid.NullUUID{UUID: tpl.ID, Valid: true}})
dbm.EXPECT().GetProvisionerJobByIDWithLock(gomock.Any(), j.ID).Return(j, nil).AnyTimes()
dbm.EXPECT().GetTemplateVersionByJobID(gomock.Any(), j.ID).Return(v, nil).AnyTimes()
dbm.EXPECT().GetTemplateByID(gomock.Any(), tpl.ID).Return(tpl, nil).AnyTimes()
check.Args(j.ID).Asserts(v.RBACObject(tpl), policy.ActionRead).Returns(j)
}))
}
func (s *MethodTestSuite) TestLicense() {
@@ -1272,6 +1272,13 @@ func (m queryMetricsStore) GetProvisionerJobByIDForUpdate(ctx context.Context, i
return r0, r1
}
func (m queryMetricsStore) GetProvisionerJobByIDWithLock(ctx context.Context, id uuid.UUID) (database.ProvisionerJob, error) {
start := time.Now()
r0, r1 := m.s.GetProvisionerJobByIDWithLock(ctx, id)
m.queryLatencies.WithLabelValues("GetProvisionerJobByIDWithLock").Observe(time.Since(start).Seconds())
return r0, r1
}
func (m queryMetricsStore) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]database.ProvisionerJobTiming, error) {
start := time.Now()
r0, r1 := m.s.GetProvisionerJobTimingsByJobID(ctx, jobID)
+15
View File
@@ -2670,6 +2670,21 @@ func (mr *MockStoreMockRecorder) GetProvisionerJobByIDForUpdate(ctx, id any) *go
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProvisionerJobByIDForUpdate", reflect.TypeOf((*MockStore)(nil).GetProvisionerJobByIDForUpdate), ctx, id)
}
// GetProvisionerJobByIDWithLock mocks base method.
func (m *MockStore) GetProvisionerJobByIDWithLock(ctx context.Context, id uuid.UUID) (database.ProvisionerJob, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetProvisionerJobByIDWithLock", ctx, id)
ret0, _ := ret[0].(database.ProvisionerJob)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetProvisionerJobByIDWithLock indicates an expected call of GetProvisionerJobByIDWithLock.
func (mr *MockStoreMockRecorder) GetProvisionerJobByIDWithLock(ctx, id any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProvisionerJobByIDWithLock", reflect.TypeOf((*MockStore)(nil).GetProvisionerJobByIDWithLock), ctx, id)
}
// GetProvisionerJobTimingsByJobID mocks base method.
func (m *MockStore) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]database.ProvisionerJobTiming, error) {
m.ctrl.T.Helper()
+3
View File
@@ -296,6 +296,9 @@ type sqlcQuerier interface {
// Gets a single provisioner job by ID for update.
// This is used to securely reap jobs that have been hung/pending for a long time.
GetProvisionerJobByIDForUpdate(ctx context.Context, id uuid.UUID) (ProvisionerJob, error)
// Gets a provisioner job by ID with exclusive lock.
// Blocks until the row is available for update.
GetProvisionerJobByIDWithLock(ctx context.Context, id uuid.UUID) (ProvisionerJob, error)
GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]ProvisionerJobTiming, error)
GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]ProvisionerJob, error)
GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, arg GetProvisionerJobsByIDsWithQueuePositionParams) ([]GetProvisionerJobsByIDsWithQueuePositionRow, error)
+41
View File
@@ -8875,6 +8875,47 @@ func (q *sqlQuerier) GetProvisionerJobByIDForUpdate(ctx context.Context, id uuid
return i, err
}
const getProvisionerJobByIDWithLock = `-- name: GetProvisionerJobByIDWithLock :one
SELECT
id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status, logs_length, logs_overflowed
FROM
provisioner_jobs
WHERE
id = $1
FOR UPDATE
`
// Gets a provisioner job by ID with exclusive lock.
// Blocks until the row is available for update.
func (q *sqlQuerier) GetProvisionerJobByIDWithLock(ctx context.Context, id uuid.UUID) (ProvisionerJob, error) {
row := q.db.QueryRowContext(ctx, getProvisionerJobByIDWithLock, id)
var i ProvisionerJob
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.StartedAt,
&i.CanceledAt,
&i.CompletedAt,
&i.Error,
&i.OrganizationID,
&i.InitiatorID,
&i.Provisioner,
&i.StorageMethod,
&i.Type,
&i.Input,
&i.WorkerID,
&i.FileID,
&i.Tags,
&i.ErrorCode,
&i.TraceMetadata,
&i.JobStatus,
&i.LogsLength,
&i.LogsOverflowed,
)
return i, err
}
const getProvisionerJobTimingsByJobID = `-- name: GetProvisionerJobTimingsByJobID :many
SELECT job_id, started_at, ended_at, stage, source, action, resource FROM provisioner_job_timings
WHERE job_id = $1
@@ -55,6 +55,17 @@ WHERE
FOR UPDATE
SKIP LOCKED;
-- name: GetProvisionerJobByIDWithLock :one
-- Gets a provisioner job by ID with exclusive lock.
-- Blocks until the row is available for update.
SELECT
*
FROM
provisioner_jobs
WHERE
id = $1
FOR UPDATE;
-- name: GetProvisionerJobsByIDs :many
SELECT
*
+1 -1
View File
@@ -663,7 +663,7 @@ func (api *API) patchCancelWorkspaceBuild(rw http.ResponseWriter, r *http.Reques
return xerrors.New("user is not allowed to cancel workspace builds")
}
job, err := db.GetProvisionerJobByIDForUpdate(ctx, workspaceBuild.JobID)
job, err := db.GetProvisionerJobByIDWithLock(ctx, workspaceBuild.JobID)
if err != nil {
code = http.StatusInternalServerError
resp.Message = "Internal error fetching provisioner job."
+1 -1
View File
@@ -580,7 +580,7 @@ func TestPatchCancelWorkspaceBuild(t *testing.T) {
require.Eventually(t, func() bool {
err := client.CancelWorkspaceBuild(ctx, build.ID, codersdk.CancelWorkspaceBuildParams{})
return assert.NoError(t, err)
return err == nil
}, testutil.WaitShort, testutil.IntervalMedium)
require.Eventually(t, func() bool {