fix(coderd/database): exclude canceled jobs in queue position (#15835)

When calculating the queue position in
`GetProvisionerJobsByIDsWithQueuePosition` we only counted jobs with
`started_at = NULL`. This is misleading, as it allows canceling or
canceled jobs to take up rows in the computed queue position, giving an
impression that the queue is larger than it really is.

This modifies the query to also exclude jobs with a null `canceled_at`,
`completed_at`, or `error` field for the purposes of calculating the
queue position, and also adds a test to validate this behaviour.

(Note: due to the behaviour of `dbgen.ProvisionerJob` with `dbmem` I had
to use other proxy methods to validate the corresponding dbmem
implementation.)

---------

Co-authored-by: Mathias Fredriksson <mafredri@gmail.com>
This commit is contained in:
Cian Johnston
2024-12-12 12:37:45 +00:00
committed by GitHub
parent edb0cb155f
commit 36c2cf8a40
4 changed files with 221 additions and 31 deletions
+82 -25
View File
@@ -3804,35 +3804,92 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(_ context.Context
q.mutex.RLock()
defer q.mutex.RUnlock()
jobs := make([]database.GetProvisionerJobsByIDsWithQueuePositionRow, 0)
queuePosition := int64(1)
// WITH pending_jobs AS (
// SELECT
// id, created_at
// FROM
// provisioner_jobs
// WHERE
// started_at IS NULL
// AND
// canceled_at IS NULL
// AND
// completed_at IS NULL
// AND
// error IS NULL
// ),
type pendingJobRow struct {
ID uuid.UUID
CreatedAt time.Time
}
pendingJobs := make([]pendingJobRow, 0)
for _, job := range q.provisionerJobs {
for _, id := range ids {
if id == job.ID {
// clone the Tags before appending, since maps are reference types and
// we don't want the caller to be able to mutate the map we have inside
// dbmem!
job.Tags = maps.Clone(job.Tags)
job := database.GetProvisionerJobsByIDsWithQueuePositionRow{
ProvisionerJob: job,
}
if !job.ProvisionerJob.StartedAt.Valid {
job.QueuePosition = queuePosition
}
jobs = append(jobs, job)
break
}
}
if !job.StartedAt.Valid {
queuePosition++
if job.StartedAt.Valid ||
job.CanceledAt.Valid ||
job.CompletedAt.Valid ||
job.Error.Valid {
continue
}
pendingJobs = append(pendingJobs, pendingJobRow{
ID: job.ID,
CreatedAt: job.CreatedAt,
})
}
for _, job := range jobs {
if !job.ProvisionerJob.StartedAt.Valid {
// Set it to the max position!
job.QueueSize = queuePosition
}
// queue_position AS (
// SELECT
// id,
// ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position
// FROM
// pending_jobs
// ),
slices.SortFunc(pendingJobs, func(a, b pendingJobRow) int {
c := a.CreatedAt.Compare(b.CreatedAt)
return c
})
queuePosition := make(map[uuid.UUID]int64)
for idx, pj := range pendingJobs {
queuePosition[pj.ID] = int64(idx + 1)
}
// queue_size AS (
// SELECT COUNT(*) AS count FROM pending_jobs
// ),
queueSize := len(pendingJobs)
// SELECT
// sqlc.embed(pj),
// COALESCE(qp.queue_position, 0) AS queue_position,
// COALESCE(qs.count, 0) AS queue_size
// FROM
// provisioner_jobs pj
// LEFT JOIN
// queue_position qp ON pj.id = qp.id
// LEFT JOIN
// queue_size qs ON TRUE
// WHERE
// pj.id IN (...)
jobs := make([]database.GetProvisionerJobsByIDsWithQueuePositionRow, 0)
for _, job := range q.provisionerJobs {
if !slices.Contains(ids, job.ID) {
continue
}
// clone the Tags before appending, since maps are reference types and
// we don't want the caller to be able to mutate the map we have inside
// dbmem!
job.Tags = maps.Clone(job.Tags)
job := database.GetProvisionerJobsByIDsWithQueuePositionRow{
// sqlc.embed(pj),
ProvisionerJob: job,
// COALESCE(qp.queue_position, 0) AS queue_position,
QueuePosition: queuePosition[job.ID],
// COALESCE(qs.count, 0) AS queue_size
QueueSize: int64(queueSize),
}
jobs = append(jobs, job)
}
return jobs, nil
}
+121
View File
@@ -13,6 +13,7 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"cdr.dev/slog/sloggers/slogtest"
@@ -2037,6 +2038,126 @@ func TestExpectOne(t *testing.T) {
})
}
func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) {
t.Parallel()
if !dbtestutil.WillUsePostgres() {
t.SkipNow()
}
db, _ := dbtestutil.NewDB(t)
now := dbtime.Now()
ctx := testutil.Context(t, testutil.WaitShort)
// Given the following provisioner jobs:
allJobs := []database.ProvisionerJob{
// Pending. This will be the last in the queue because
// it was created most recently.
dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
CreatedAt: now.Add(-time.Minute),
StartedAt: sql.NullTime{},
CanceledAt: sql.NullTime{},
CompletedAt: sql.NullTime{},
Error: sql.NullString{},
}),
// Another pending. This will come first in the queue
// because it was created before the previous job.
dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
CreatedAt: now.Add(-2 * time.Minute),
StartedAt: sql.NullTime{},
CanceledAt: sql.NullTime{},
CompletedAt: sql.NullTime{},
Error: sql.NullString{},
}),
// Running
dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
CreatedAt: now.Add(-3 * time.Minute),
StartedAt: sql.NullTime{Valid: true, Time: now},
CanceledAt: sql.NullTime{},
CompletedAt: sql.NullTime{},
Error: sql.NullString{},
}),
// Succeeded
dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
CreatedAt: now.Add(-4 * time.Minute),
StartedAt: sql.NullTime{Valid: true, Time: now},
CanceledAt: sql.NullTime{},
CompletedAt: sql.NullTime{Valid: true, Time: now},
Error: sql.NullString{},
}),
// Canceling
dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
CreatedAt: now.Add(-5 * time.Minute),
StartedAt: sql.NullTime{},
CanceledAt: sql.NullTime{Valid: true, Time: now},
CompletedAt: sql.NullTime{},
Error: sql.NullString{},
}),
// Canceled
dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
CreatedAt: now.Add(-6 * time.Minute),
StartedAt: sql.NullTime{},
CanceledAt: sql.NullTime{Valid: true, Time: now},
CompletedAt: sql.NullTime{Valid: true, Time: now},
Error: sql.NullString{},
}),
// Failed
dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
CreatedAt: now.Add(-7 * time.Minute),
StartedAt: sql.NullTime{},
CanceledAt: sql.NullTime{},
CompletedAt: sql.NullTime{},
Error: sql.NullString{String: "failed", Valid: true},
}),
}
// Assert invariant: the jobs are in the expected order
require.Len(t, allJobs, 7, "expected 7 jobs")
for idx, status := range []database.ProvisionerJobStatus{
database.ProvisionerJobStatusPending,
database.ProvisionerJobStatusPending,
database.ProvisionerJobStatusRunning,
database.ProvisionerJobStatusSucceeded,
database.ProvisionerJobStatusCanceling,
database.ProvisionerJobStatusCanceled,
database.ProvisionerJobStatusFailed,
} {
require.Equal(t, status, allJobs[idx].JobStatus, "expected job %d to have status %s", idx, status)
}
var jobIDs []uuid.UUID
for _, job := range allJobs {
jobIDs = append(jobIDs, job.ID)
}
// When: we fetch the jobs by their IDs
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
require.NoError(t, err)
require.Len(t, actualJobs, len(allJobs), "should return all jobs")
// Then: the jobs should be returned in the correct order (by IDs in the input slice)
for idx, job := range actualJobs {
assert.EqualValues(t, allJobs[idx], job.ProvisionerJob)
}
// Then: the queue size should be set correctly
for _, job := range actualJobs {
assert.EqualValues(t, job.QueueSize, 2, "should have queue size 2")
}
// Then: the queue position should be set correctly:
var queuePositions []int64
for _, job := range actualJobs {
queuePositions = append(queuePositions, job.QueuePosition)
}
assert.EqualValues(t, []int64{2, 1, 0, 0, 0, 0, 0}, queuePositions, "expected queue positions to be set correctly")
}
func TestGroupRemovalTrigger(t *testing.T) {
t.Parallel()
+9 -3
View File
@@ -5865,23 +5865,29 @@ func (q *sqlQuerier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUI
}
const getProvisionerJobsByIDsWithQueuePosition = `-- name: GetProvisionerJobsByIDsWithQueuePosition :many
WITH unstarted_jobs AS (
WITH pending_jobs AS (
SELECT
id, created_at
FROM
provisioner_jobs
WHERE
started_at IS NULL
AND
canceled_at IS NULL
AND
completed_at IS NULL
AND
error IS NULL
),
queue_position AS (
SELECT
id,
ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position
FROM
unstarted_jobs
pending_jobs
),
queue_size AS (
SELECT COUNT(*) as count FROM unstarted_jobs
SELECT COUNT(*) AS count FROM pending_jobs
)
SELECT
pj.id, pj.created_at, pj.updated_at, pj.started_at, pj.canceled_at, pj.completed_at, pj.error, pj.organization_id, pj.initiator_id, pj.provisioner, pj.storage_method, pj.type, pj.input, pj.worker_id, pj.file_id, pj.tags, pj.error_code, pj.trace_metadata, pj.job_status,
+9 -3
View File
@@ -50,23 +50,29 @@ WHERE
id = ANY(@ids :: uuid [ ]);
-- name: GetProvisionerJobsByIDsWithQueuePosition :many
WITH unstarted_jobs AS (
WITH pending_jobs AS (
SELECT
id, created_at
FROM
provisioner_jobs
WHERE
started_at IS NULL
AND
canceled_at IS NULL
AND
completed_at IS NULL
AND
error IS NULL
),
queue_position AS (
SELECT
id,
ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position
FROM
unstarted_jobs
pending_jobs
),
queue_size AS (
SELECT COUNT(*) as count FROM unstarted_jobs
SELECT COUNT(*) AS count FROM pending_jobs
)
SELECT
sqlc.embed(pj),