mirror of
https://github.com/coder/coder.git
synced 2026-06-03 21:18:24 +00:00
d17dd5d787
Relates to https://github.com/coder/internal/issues/934 This PR provides a mechanism to filter provisioner jobs according to who initiated the job. This will be used to find pending prebuild jobs when prebuilds have overwhelmed the provisioner job queue. They can then be canceled. If prebuilds are overwhelming provisioners, the following steps will be taken: ```bash # pause prebuild reconciliation to limit provisioner queue pollution: coder prebuilds pause # cancel pending provisioner jobs to clear the queue coder provisioner jobs list --initiator="prebuilds" --status="pending" | jq ... | xargs -n1 -I{} coder provisioner jobs cancel {} # push a fixed template and wait for the import to complete coder templates push ... # push a fixed template # resume prebuild reconciliation coder prebuilds resume ``` This interface differs somewhat from what was specified in the issue, but still provides a mechanism that addresses the issue. The original proposal was made by myself and this simpler implementation makes sense. I might add a `--search` parameter in a follow-up if there is appetite for it. Potential follow ups: * Support for this usage: `coder provisioner jobs list --search "initiator:prebuilds status:pending"` * Adding the same parameters to `coder provisioner jobs cancel` as a convenience feature so that operators don't have to pipe through `jq` and `xargs`
347 lines
9.2 KiB
SQL
347 lines
9.2 KiB
SQL
-- Acquires the lock for a single job that isn't started, completed,
|
|
-- canceled, and that matches an array of provisioner types.
|
|
--
|
|
-- SKIP LOCKED is used to jump over locked rows. This prevents
|
|
-- multiple provisioners from acquiring the same jobs. See:
|
|
-- https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
|
|
-- name: AcquireProvisionerJob :one
|
|
UPDATE
|
|
provisioner_jobs
|
|
SET
|
|
started_at = @started_at,
|
|
updated_at = @started_at,
|
|
worker_id = @worker_id
|
|
WHERE
|
|
id = (
|
|
SELECT
|
|
id
|
|
FROM
|
|
provisioner_jobs AS potential_job
|
|
WHERE
|
|
potential_job.started_at IS NULL
|
|
AND potential_job.organization_id = @organization_id
|
|
-- Ensure the caller has the correct provisioner.
|
|
AND potential_job.provisioner = ANY(@types :: provisioner_type [ ])
|
|
-- elsewhere, we use the tagset type, but here we use jsonb for backward compatibility
|
|
-- they are aliases and the code that calls this query already relies on a different type
|
|
AND provisioner_tagset_contains(@provisioner_tags :: jsonb, potential_job.tags :: jsonb)
|
|
ORDER BY
|
|
-- Ensure that human-initiated jobs are prioritized over prebuilds.
|
|
potential_job.initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC,
|
|
potential_job.created_at ASC
|
|
FOR UPDATE
|
|
SKIP LOCKED
|
|
LIMIT
|
|
1
|
|
) RETURNING *;
|
|
|
|
-- name: GetProvisionerJobByID :one
|
|
SELECT
|
|
*
|
|
FROM
|
|
provisioner_jobs
|
|
WHERE
|
|
id = $1;
|
|
|
|
-- name: GetProvisionerJobByIDForUpdate :one
|
|
-- Gets a single provisioner job by ID for update.
|
|
-- This is used to securely reap jobs that have been hung/pending for a long time.
|
|
SELECT
|
|
*
|
|
FROM
|
|
provisioner_jobs
|
|
WHERE
|
|
id = $1
|
|
FOR UPDATE
|
|
SKIP LOCKED;
|
|
|
|
-- name: GetProvisionerJobByIDWithLock :one
|
|
-- Gets a provisioner job by ID with exclusive lock.
|
|
-- Blocks until the row is available for update.
|
|
SELECT
|
|
*
|
|
FROM
|
|
provisioner_jobs
|
|
WHERE
|
|
id = $1
|
|
FOR UPDATE;
|
|
|
|
-- name: GetProvisionerJobsByIDs :many
|
|
SELECT
|
|
*
|
|
FROM
|
|
provisioner_jobs
|
|
WHERE
|
|
id = ANY(@ids :: uuid [ ]);
|
|
|
|
-- name: GetProvisionerJobsByIDsWithQueuePosition :many
|
|
WITH filtered_provisioner_jobs AS (
|
|
-- Step 1: Filter provisioner_jobs
|
|
SELECT
|
|
id, created_at
|
|
FROM
|
|
provisioner_jobs
|
|
WHERE
|
|
id = ANY(@ids :: uuid [ ]) -- Apply filter early to reduce dataset size before expensive JOIN
|
|
),
|
|
pending_jobs AS (
|
|
-- Step 2: Extract only pending jobs
|
|
SELECT
|
|
id, initiator_id, created_at, tags
|
|
FROM
|
|
provisioner_jobs
|
|
WHERE
|
|
job_status = 'pending'
|
|
),
|
|
online_provisioner_daemons AS (
|
|
SELECT id, tags FROM provisioner_daemons pd
|
|
WHERE pd.last_seen_at IS NOT NULL AND pd.last_seen_at >= (NOW() - (@stale_interval_ms::bigint || ' ms')::interval)
|
|
),
|
|
ranked_jobs AS (
|
|
-- Step 3: Rank only pending jobs based on provisioner availability
|
|
SELECT
|
|
pj.id,
|
|
pj.created_at,
|
|
ROW_NUMBER() OVER (PARTITION BY opd.id ORDER BY pj.initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC, pj.created_at ASC) AS queue_position,
|
|
COUNT(*) OVER (PARTITION BY opd.id) AS queue_size
|
|
FROM
|
|
pending_jobs pj
|
|
INNER JOIN online_provisioner_daemons opd
|
|
ON provisioner_tagset_contains(opd.tags, pj.tags) -- Join only on the small pending set
|
|
),
|
|
final_jobs AS (
|
|
-- Step 4: Compute best queue position and max queue size per job
|
|
SELECT
|
|
fpj.id,
|
|
fpj.created_at,
|
|
COALESCE(MIN(rj.queue_position), 0) :: BIGINT AS queue_position, -- Best queue position across provisioners
|
|
COALESCE(MAX(rj.queue_size), 0) :: BIGINT AS queue_size -- Max queue size across provisioners
|
|
FROM
|
|
filtered_provisioner_jobs fpj -- Use the pre-filtered dataset instead of full provisioner_jobs
|
|
LEFT JOIN ranked_jobs rj
|
|
ON fpj.id = rj.id -- Join with the ranking jobs CTE to assign a rank to each specified provisioner job.
|
|
GROUP BY
|
|
fpj.id, fpj.created_at
|
|
)
|
|
SELECT
|
|
-- Step 5: Final SELECT with INNER JOIN provisioner_jobs
|
|
fj.id,
|
|
fj.created_at,
|
|
sqlc.embed(pj),
|
|
fj.queue_position,
|
|
fj.queue_size
|
|
FROM
|
|
final_jobs fj
|
|
INNER JOIN provisioner_jobs pj
|
|
ON fj.id = pj.id -- Ensure we retrieve full details from `provisioner_jobs`.
|
|
-- JOIN with pj is required for sqlc.embed(pj) to compile successfully.
|
|
ORDER BY
|
|
fj.created_at;
|
|
|
|
-- name: GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner :many
|
|
WITH pending_jobs AS (
|
|
SELECT
|
|
id, initiator_id, created_at
|
|
FROM
|
|
provisioner_jobs
|
|
WHERE
|
|
started_at IS NULL
|
|
AND
|
|
canceled_at IS NULL
|
|
AND
|
|
completed_at IS NULL
|
|
AND
|
|
error IS NULL
|
|
),
|
|
queue_position AS (
|
|
SELECT
|
|
id,
|
|
ROW_NUMBER() OVER (ORDER BY initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC, created_at ASC) AS queue_position
|
|
FROM
|
|
pending_jobs
|
|
),
|
|
queue_size AS (
|
|
SELECT COUNT(*) AS count FROM pending_jobs
|
|
)
|
|
SELECT
|
|
sqlc.embed(pj),
|
|
COALESCE(qp.queue_position, 0) AS queue_position,
|
|
COALESCE(qs.count, 0) AS queue_size,
|
|
-- Use subquery to utilize ORDER BY in array_agg since it cannot be
|
|
-- combined with FILTER.
|
|
(
|
|
SELECT
|
|
-- Order for stable output.
|
|
array_agg(pd.id ORDER BY pd.created_at ASC)::uuid[]
|
|
FROM
|
|
provisioner_daemons pd
|
|
WHERE
|
|
-- See AcquireProvisionerJob.
|
|
pj.started_at IS NULL
|
|
AND pj.organization_id = pd.organization_id
|
|
AND pj.provisioner = ANY(pd.provisioners)
|
|
AND provisioner_tagset_contains(pd.tags, pj.tags)
|
|
) AS available_workers,
|
|
-- Include template and workspace information.
|
|
COALESCE(tv.name, '') AS template_version_name,
|
|
t.id AS template_id,
|
|
COALESCE(t.name, '') AS template_name,
|
|
COALESCE(t.display_name, '') AS template_display_name,
|
|
COALESCE(t.icon, '') AS template_icon,
|
|
w.id AS workspace_id,
|
|
COALESCE(w.name, '') AS workspace_name,
|
|
-- Include the name of the provisioner_daemon associated to the job
|
|
COALESCE(pd.name, '') AS worker_name
|
|
FROM
|
|
provisioner_jobs pj
|
|
LEFT JOIN
|
|
queue_position qp ON qp.id = pj.id
|
|
LEFT JOIN
|
|
queue_size qs ON TRUE
|
|
LEFT JOIN
|
|
workspace_builds wb ON wb.id = CASE WHEN pj.input ? 'workspace_build_id' THEN (pj.input->>'workspace_build_id')::uuid END
|
|
LEFT JOIN
|
|
workspaces w ON (
|
|
w.id = wb.workspace_id
|
|
AND w.organization_id = pj.organization_id
|
|
)
|
|
LEFT JOIN
|
|
-- We should always have a template version, either explicitly or implicitly via workspace build.
|
|
template_versions tv ON (
|
|
tv.id = CASE WHEN pj.input ? 'template_version_id' THEN (pj.input->>'template_version_id')::uuid ELSE wb.template_version_id END
|
|
AND tv.organization_id = pj.organization_id
|
|
)
|
|
LEFT JOIN
|
|
templates t ON (
|
|
t.id = tv.template_id
|
|
AND t.organization_id = pj.organization_id
|
|
)
|
|
LEFT JOIN
|
|
-- Join to get the daemon name corresponding to the job's worker_id
|
|
provisioner_daemons pd ON pd.id = pj.worker_id
|
|
WHERE
|
|
pj.organization_id = @organization_id::uuid
|
|
AND (COALESCE(array_length(@ids::uuid[], 1), 0) = 0 OR pj.id = ANY(@ids::uuid[]))
|
|
AND (COALESCE(array_length(@status::provisioner_job_status[], 1), 0) = 0 OR pj.job_status = ANY(@status::provisioner_job_status[]))
|
|
AND (@tags::tagset = 'null'::tagset OR provisioner_tagset_contains(pj.tags::tagset, @tags::tagset))
|
|
AND (@initiator_id::uuid = '00000000-0000-0000-0000-000000000000'::uuid OR pj.initiator_id = @initiator_id::uuid)
|
|
GROUP BY
|
|
pj.id,
|
|
qp.queue_position,
|
|
qs.count,
|
|
tv.name,
|
|
t.id,
|
|
t.name,
|
|
t.display_name,
|
|
t.icon,
|
|
w.id,
|
|
w.name,
|
|
pd.name
|
|
ORDER BY
|
|
pj.created_at DESC
|
|
LIMIT
|
|
sqlc.narg('limit')::int;
|
|
|
|
-- name: GetProvisionerJobsCreatedAfter :many
|
|
SELECT * FROM provisioner_jobs WHERE created_at > $1;
|
|
|
|
-- name: InsertProvisionerJob :one
|
|
INSERT INTO
|
|
provisioner_jobs (
|
|
id,
|
|
created_at,
|
|
updated_at,
|
|
organization_id,
|
|
initiator_id,
|
|
provisioner,
|
|
storage_method,
|
|
file_id,
|
|
"type",
|
|
"input",
|
|
tags,
|
|
trace_metadata,
|
|
logs_overflowed
|
|
)
|
|
VALUES
|
|
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING *;
|
|
|
|
-- name: UpdateProvisionerJobByID :exec
|
|
UPDATE
|
|
provisioner_jobs
|
|
SET
|
|
updated_at = $2
|
|
WHERE
|
|
id = $1;
|
|
|
|
-- name: UpdateProvisionerJobWithCancelByID :exec
|
|
UPDATE
|
|
provisioner_jobs
|
|
SET
|
|
canceled_at = $2,
|
|
completed_at = $3
|
|
WHERE
|
|
id = $1;
|
|
|
|
-- name: UpdateProvisionerJobWithCompleteByID :exec
|
|
UPDATE
|
|
provisioner_jobs
|
|
SET
|
|
updated_at = $2,
|
|
completed_at = $3,
|
|
error = $4,
|
|
error_code = $5
|
|
WHERE
|
|
id = $1;
|
|
|
|
-- name: UpdateProvisionerJobWithCompleteWithStartedAtByID :exec
|
|
UPDATE
|
|
provisioner_jobs
|
|
SET
|
|
updated_at = $2,
|
|
completed_at = $3,
|
|
error = $4,
|
|
error_code = $5,
|
|
started_at = $6
|
|
WHERE
|
|
id = $1;
|
|
|
|
-- name: GetProvisionerJobsToBeReaped :many
|
|
SELECT
|
|
*
|
|
FROM
|
|
provisioner_jobs
|
|
WHERE
|
|
(
|
|
-- If the job has not been started before @pending_since, reap it.
|
|
updated_at < @pending_since
|
|
AND started_at IS NULL
|
|
AND completed_at IS NULL
|
|
)
|
|
OR
|
|
(
|
|
-- If the job has been started but not completed before @hung_since, reap it.
|
|
updated_at < @hung_since
|
|
AND started_at IS NOT NULL
|
|
AND completed_at IS NULL
|
|
)
|
|
-- To avoid repeatedly attempting to reap the same jobs, we randomly order and limit to @max_jobs.
|
|
ORDER BY random()
|
|
LIMIT @max_jobs;
|
|
|
|
-- name: InsertProvisionerJobTimings :many
|
|
INSERT INTO provisioner_job_timings (job_id, started_at, ended_at, stage, source, action, resource)
|
|
SELECT
|
|
@job_id::uuid AS provisioner_job_id,
|
|
unnest(@started_at::timestamptz[]),
|
|
unnest(@ended_at::timestamptz[]),
|
|
unnest(@stage::provisioner_job_timing_stage[]),
|
|
unnest(@source::text[]),
|
|
unnest(@action::text[]),
|
|
unnest(@resource::text[])
|
|
RETURNING *;
|
|
|
|
-- name: GetProvisionerJobTimingsByJobID :many
|
|
SELECT * FROM provisioner_job_timings
|
|
WHERE job_id = $1
|
|
ORDER BY started_at ASC;
|