feat: Add parameter and jobs database schema (#81)

* feat: Add parameter and jobs database schema

This modifies a prior migration which is typically forbidden,
but because we're pre-production deployment I felt grouping
would be helpful to future contributors.

This adds database functions that are required for the provisioner
daemon and job queue logic.

* Add comment to acquire provisioner job query

* PostgreSQL hates running in parallel
This commit is contained in:
Kyle Carberry
2022-01-29 17:38:32 -06:00
committed by GitHub
parent 599ea2a331
commit b503c8b099
11 changed files with 1174 additions and 63 deletions
+528 -23
View File
@@ -13,6 +13,66 @@ import (
"github.com/lib/pq"
)
const acquireProvisionerJob = `-- name: AcquireProvisionerJob :one
UPDATE
provisioner_job
SET
started_at = $1,
updated_at = $1,
worker_id = $2
WHERE
id = (
SELECT
id
FROM
provisioner_job AS nested
WHERE
nested.started_at IS NULL
AND nested.cancelled_at IS NULL
AND nested.completed_at IS NULL
AND nested.provisioner = ANY($3 :: provisioner_type [ ])
ORDER BY
nested.created FOR
UPDATE
SKIP LOCKED
LIMIT
1
) RETURNING id, created_at, updated_at, started_at, cancelled_at, completed_at, error, initiator_id, provisioner, type, project_id, input, worker_id
`
type AcquireProvisionerJobParams struct {
StartedAt sql.NullTime `db:"started_at" json:"started_at"`
WorkerID uuid.NullUUID `db:"worker_id" json:"worker_id"`
Types []ProvisionerType `db:"types" json:"types"`
}
// Acquires the lock for a single job that isn't started, completed,
// cancelled, and that matches an array of provisioner types.
//
// SKIP LOCKED is used to jump over locked rows. This prevents
// multiple provisioners from acquiring the same jobs. See:
// https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
func (q *sqlQuerier) AcquireProvisionerJob(ctx context.Context, arg AcquireProvisionerJobParams) (ProvisionerJob, error) {
row := q.db.QueryRowContext(ctx, acquireProvisionerJob, arg.StartedAt, arg.WorkerID, pq.Array(arg.Types))
var i ProvisionerJob
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.StartedAt,
&i.CancelledAt,
&i.CompletedAt,
&i.Error,
&i.InitiatorID,
&i.Provisioner,
&i.Type,
&i.ProjectID,
&i.Input,
&i.WorkerID,
)
return i, err
}
const getAPIKeyByID = `-- name: GetAPIKeyByID :one
SELECT
id, hashed_secret, user_id, application, name, last_used, expires_at, created_at, updated_at, login_type, oidc_access_token, oidc_refresh_token, oidc_id_token, oidc_expiry, devurl_token
@@ -47,6 +107,33 @@ func (q *sqlQuerier) GetAPIKeyByID(ctx context.Context, id string) (APIKey, erro
return i, err
}
const getOrganizationByID = `-- name: GetOrganizationByID :one
SELECT
id, name, description, created_at, updated_at, "default", auto_off_threshold, cpu_provisioning_rate, memory_provisioning_rate, workspace_auto_off
FROM
organizations
WHERE
id = $1
`
func (q *sqlQuerier) GetOrganizationByID(ctx context.Context, id string) (Organization, error) {
row := q.db.QueryRowContext(ctx, getOrganizationByID, id)
var i Organization
err := row.Scan(
&i.ID,
&i.Name,
&i.Description,
&i.CreatedAt,
&i.UpdatedAt,
&i.Default,
&i.AutoOffThreshold,
&i.CpuProvisioningRate,
&i.MemoryProvisioningRate,
&i.WorkspaceAutoOff,
)
return i, err
}
const getOrganizationByName = `-- name: GetOrganizationByName :one
SELECT
id, name, description, created_at, updated_at, "default", auto_off_threshold, cpu_provisioning_rate, memory_provisioning_rate, workspace_auto_off
@@ -156,6 +243,55 @@ func (q *sqlQuerier) GetOrganizationsByUserID(ctx context.Context, userID string
return items, nil
}
const getParameterValuesByScope = `-- name: GetParameterValuesByScope :many
SELECT
id, name, created_at, updated_at, scope, scope_id, source_scheme, source_value, destination_scheme, destination_value
FROM
parameter_value
WHERE
scope = $1
AND scope_id = $2
`
type GetParameterValuesByScopeParams struct {
Scope ParameterScope `db:"scope" json:"scope"`
ScopeID string `db:"scope_id" json:"scope_id"`
}
func (q *sqlQuerier) GetParameterValuesByScope(ctx context.Context, arg GetParameterValuesByScopeParams) ([]ParameterValue, error) {
rows, err := q.db.QueryContext(ctx, getParameterValuesByScope, arg.Scope, arg.ScopeID)
if err != nil {
return nil, err
}
defer rows.Close()
var items []ParameterValue
for rows.Next() {
var i ParameterValue
if err := rows.Scan(
&i.ID,
&i.Name,
&i.CreatedAt,
&i.UpdatedAt,
&i.Scope,
&i.ScopeID,
&i.SourceScheme,
&i.SourceValue,
&i.DestinationScheme,
&i.DestinationValue,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getProjectByID = `-- name: GetProjectByID :one
SELECT
id, created_at, updated_at, organization_id, name, provisioner, active_version_id
@@ -282,6 +418,56 @@ func (q *sqlQuerier) GetProjectHistoryByProjectID(ctx context.Context, projectID
return items, nil
}
const getProjectParametersByHistoryID = `-- name: GetProjectParametersByHistoryID :many
SELECT
id, created_at, project_history_id, name, description, default_source_scheme, default_source_value, allow_override_source, default_destination_scheme, default_destination_value, allow_override_destination, default_refresh, redisplay_value, validation_error, validation_condition, validation_type_system, validation_value_type
FROM
project_parameter
WHERE
project_history_id = $1
`
func (q *sqlQuerier) GetProjectParametersByHistoryID(ctx context.Context, projectHistoryID uuid.UUID) ([]ProjectParameter, error) {
rows, err := q.db.QueryContext(ctx, getProjectParametersByHistoryID, projectHistoryID)
if err != nil {
return nil, err
}
defer rows.Close()
var items []ProjectParameter
for rows.Next() {
var i ProjectParameter
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.ProjectHistoryID,
&i.Name,
&i.Description,
&i.DefaultSourceScheme,
&i.DefaultSourceValue,
&i.AllowOverrideSource,
&i.DefaultDestinationScheme,
&i.DefaultDestinationValue,
&i.AllowOverrideDestination,
&i.DefaultRefresh,
&i.RedisplayValue,
&i.ValidationError,
&i.ValidationCondition,
&i.ValidationTypeSystem,
&i.ValidationValueType,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getProjectsByOrganizationIDs = `-- name: GetProjectsByOrganizationIDs :many
SELECT
id, created_at, updated_at, organization_id, name, provisioner, active_version_id
@@ -322,6 +508,58 @@ func (q *sqlQuerier) GetProjectsByOrganizationIDs(ctx context.Context, ids []str
return items, nil
}
const getProvisionerDaemonByID = `-- name: GetProvisionerDaemonByID :one
SELECT
id, created_at, updated_at, name, provisioners
FROM
provisioner_daemon
WHERE
id = $1
`
func (q *sqlQuerier) GetProvisionerDaemonByID(ctx context.Context, id uuid.UUID) (ProvisionerDaemon, error) {
row := q.db.QueryRowContext(ctx, getProvisionerDaemonByID, id)
var i ProvisionerDaemon
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Name,
pq.Array(&i.Provisioners),
)
return i, err
}
const getProvisionerJobByID = `-- name: GetProvisionerJobByID :one
SELECT
id, created_at, updated_at, started_at, cancelled_at, completed_at, error, initiator_id, provisioner, type, project_id, input, worker_id
FROM
provisioner_job
WHERE
id = $1
`
func (q *sqlQuerier) GetProvisionerJobByID(ctx context.Context, id uuid.UUID) (ProvisionerJob, error) {
row := q.db.QueryRowContext(ctx, getProvisionerJobByID, id)
var i ProvisionerJob
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.StartedAt,
&i.CancelledAt,
&i.CompletedAt,
&i.Error,
&i.InitiatorID,
&i.Provisioner,
&i.Type,
&i.ProjectID,
&i.Input,
&i.WorkerID,
)
return i, err
}
const getUserByEmailOrUsername = `-- name: GetUserByEmailOrUsername :one
SELECT
id, email, name, revoked, login_type, hashed_password, created_at, updated_at, temporary_password, avatar_hash, ssh_key_regenerated_at, username, dotfiles_git_uri, roles, status, relatime, gpg_key_regenerated_at, _decomissioned, shell
@@ -457,6 +695,31 @@ func (q *sqlQuerier) GetWorkspaceAgentsByResourceIDs(ctx context.Context, ids []
return items, nil
}
const getWorkspaceByID = `-- name: GetWorkspaceByID :one
SELECT
id, created_at, updated_at, owner_id, project_id, name
FROM
workspace
WHERE
id = $1
LIMIT
1
`
func (q *sqlQuerier) GetWorkspaceByID(ctx context.Context, id uuid.UUID) (Workspace, error) {
row := q.db.QueryRowContext(ctx, getWorkspaceByID, id)
var i Workspace
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.ProjectID,
&i.Name,
)
return i, err
}
const getWorkspaceByUserIDAndName = `-- name: GetWorkspaceByUserIDAndName :one
SELECT
id, created_at, updated_at, owner_id, project_id, name
@@ -486,6 +749,37 @@ func (q *sqlQuerier) GetWorkspaceByUserIDAndName(ctx context.Context, arg GetWor
return i, err
}
const getWorkspaceHistoryByID = `-- name: GetWorkspaceHistoryByID :one
SELECT
id, created_at, updated_at, completed_at, workspace_id, project_history_id, before_id, after_id, transition, initiator, provisioner_state, provision_job_id
FROM
workspace_history
WHERE
id = $1
LIMIT
1
`
func (q *sqlQuerier) GetWorkspaceHistoryByID(ctx context.Context, id uuid.UUID) (WorkspaceHistory, error) {
row := q.db.QueryRowContext(ctx, getWorkspaceHistoryByID, id)
var i WorkspaceHistory
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.CompletedAt,
&i.WorkspaceID,
&i.ProjectHistoryID,
&i.BeforeID,
&i.AfterID,
&i.Transition,
&i.Initiator,
&i.ProvisionerState,
&i.ProvisionJobID,
)
return i, err
}
const getWorkspaceHistoryByWorkspaceID = `-- name: GetWorkspaceHistoryByWorkspaceID :many
SELECT
id, created_at, updated_at, completed_at, workspace_id, project_history_id, before_id, after_id, transition, initiator, provisioner_state, provision_job_id
@@ -862,6 +1156,66 @@ func (q *sqlQuerier) InsertOrganizationMember(ctx context.Context, arg InsertOrg
return i, err
}
const insertParameterValue = `-- name: InsertParameterValue :one
INSERT INTO
parameter_value (
id,
name,
created_at,
updated_at,
scope,
scope_id,
source_scheme,
source_value,
destination_scheme,
destination_value
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id, name, created_at, updated_at, scope, scope_id, source_scheme, source_value, destination_scheme, destination_value
`
type InsertParameterValueParams struct {
ID uuid.UUID `db:"id" json:"id"`
Name string `db:"name" json:"name"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
Scope ParameterScope `db:"scope" json:"scope"`
ScopeID string `db:"scope_id" json:"scope_id"`
SourceScheme ParameterSourceScheme `db:"source_scheme" json:"source_scheme"`
SourceValue string `db:"source_value" json:"source_value"`
DestinationScheme ParameterDestinationScheme `db:"destination_scheme" json:"destination_scheme"`
DestinationValue string `db:"destination_value" json:"destination_value"`
}
func (q *sqlQuerier) InsertParameterValue(ctx context.Context, arg InsertParameterValueParams) (ParameterValue, error) {
row := q.db.QueryRowContext(ctx, insertParameterValue,
arg.ID,
arg.Name,
arg.CreatedAt,
arg.UpdatedAt,
arg.Scope,
arg.ScopeID,
arg.SourceScheme,
arg.SourceValue,
arg.DestinationScheme,
arg.DestinationValue,
)
var i ParameterValue
err := row.Scan(
&i.ID,
&i.Name,
&i.CreatedAt,
&i.UpdatedAt,
&i.Scope,
&i.ScopeID,
&i.SourceScheme,
&i.SourceValue,
&i.DestinationScheme,
&i.DestinationValue,
)
return i, err
}
const insertProject = `-- name: InsertProject :one
INSERT INTO
project (
@@ -971,9 +1325,11 @@ INSERT INTO
project_history_id,
name,
description,
default_source,
default_source_scheme,
default_source_value,
allow_override_source,
default_destination,
default_destination_scheme,
default_destination_value,
allow_override_destination,
default_refresh,
redisplay_value,
@@ -998,26 +1354,30 @@ VALUES
$12,
$13,
$14,
$15
) RETURNING id, created_at, project_history_id, name, description, default_source, allow_override_source, default_destination, allow_override_destination, default_refresh, redisplay_value, validation_error, validation_condition, validation_type_system, validation_value_type
$15,
$16,
$17
) RETURNING id, created_at, project_history_id, name, description, default_source_scheme, default_source_value, allow_override_source, default_destination_scheme, default_destination_value, allow_override_destination, default_refresh, redisplay_value, validation_error, validation_condition, validation_type_system, validation_value_type
`
type InsertProjectParameterParams struct {
ID uuid.UUID `db:"id" json:"id"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
ProjectHistoryID uuid.UUID `db:"project_history_id" json:"project_history_id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
DefaultSource sql.NullString `db:"default_source" json:"default_source"`
AllowOverrideSource bool `db:"allow_override_source" json:"allow_override_source"`
DefaultDestination sql.NullString `db:"default_destination" json:"default_destination"`
AllowOverrideDestination bool `db:"allow_override_destination" json:"allow_override_destination"`
DefaultRefresh string `db:"default_refresh" json:"default_refresh"`
RedisplayValue bool `db:"redisplay_value" json:"redisplay_value"`
ValidationError string `db:"validation_error" json:"validation_error"`
ValidationCondition string `db:"validation_condition" json:"validation_condition"`
ValidationTypeSystem ParameterTypeSystem `db:"validation_type_system" json:"validation_type_system"`
ValidationValueType string `db:"validation_value_type" json:"validation_value_type"`
ID uuid.UUID `db:"id" json:"id"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
ProjectHistoryID uuid.UUID `db:"project_history_id" json:"project_history_id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
DefaultSourceScheme ParameterSourceScheme `db:"default_source_scheme" json:"default_source_scheme"`
DefaultSourceValue sql.NullString `db:"default_source_value" json:"default_source_value"`
AllowOverrideSource bool `db:"allow_override_source" json:"allow_override_source"`
DefaultDestinationScheme ParameterDestinationScheme `db:"default_destination_scheme" json:"default_destination_scheme"`
DefaultDestinationValue sql.NullString `db:"default_destination_value" json:"default_destination_value"`
AllowOverrideDestination bool `db:"allow_override_destination" json:"allow_override_destination"`
DefaultRefresh string `db:"default_refresh" json:"default_refresh"`
RedisplayValue bool `db:"redisplay_value" json:"redisplay_value"`
ValidationError string `db:"validation_error" json:"validation_error"`
ValidationCondition string `db:"validation_condition" json:"validation_condition"`
ValidationTypeSystem ParameterTypeSystem `db:"validation_type_system" json:"validation_type_system"`
ValidationValueType string `db:"validation_value_type" json:"validation_value_type"`
}
func (q *sqlQuerier) InsertProjectParameter(ctx context.Context, arg InsertProjectParameterParams) (ProjectParameter, error) {
@@ -1027,9 +1387,11 @@ func (q *sqlQuerier) InsertProjectParameter(ctx context.Context, arg InsertProje
arg.ProjectHistoryID,
arg.Name,
arg.Description,
arg.DefaultSource,
arg.DefaultSourceScheme,
arg.DefaultSourceValue,
arg.AllowOverrideSource,
arg.DefaultDestination,
arg.DefaultDestinationScheme,
arg.DefaultDestinationValue,
arg.AllowOverrideDestination,
arg.DefaultRefresh,
arg.RedisplayValue,
@@ -1045,9 +1407,11 @@ func (q *sqlQuerier) InsertProjectParameter(ctx context.Context, arg InsertProje
&i.ProjectHistoryID,
&i.Name,
&i.Description,
&i.DefaultSource,
&i.DefaultSourceScheme,
&i.DefaultSourceValue,
&i.AllowOverrideSource,
&i.DefaultDestination,
&i.DefaultDestinationScheme,
&i.DefaultDestinationValue,
&i.AllowOverrideDestination,
&i.DefaultRefresh,
&i.RedisplayValue,
@@ -1059,6 +1423,95 @@ func (q *sqlQuerier) InsertProjectParameter(ctx context.Context, arg InsertProje
return i, err
}
const insertProvisionerDaemon = `-- name: InsertProvisionerDaemon :one
INSERT INTO
provisioner_daemon (id, created_at, name, provisioners)
VALUES
($1, $2, $3, $4) RETURNING id, created_at, updated_at, name, provisioners
`
type InsertProvisionerDaemonParams struct {
ID uuid.UUID `db:"id" json:"id"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
Name string `db:"name" json:"name"`
Provisioners []ProvisionerType `db:"provisioners" json:"provisioners"`
}
func (q *sqlQuerier) InsertProvisionerDaemon(ctx context.Context, arg InsertProvisionerDaemonParams) (ProvisionerDaemon, error) {
row := q.db.QueryRowContext(ctx, insertProvisionerDaemon,
arg.ID,
arg.CreatedAt,
arg.Name,
pq.Array(arg.Provisioners),
)
var i ProvisionerDaemon
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Name,
pq.Array(&i.Provisioners),
)
return i, err
}
const insertProvisionerJob = `-- name: InsertProvisionerJob :one
INSERT INTO
provisioner_job (
id,
created_at,
updated_at,
initiator_id,
provisioner,
type,
project_id,
input
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id, created_at, updated_at, started_at, cancelled_at, completed_at, error, initiator_id, provisioner, type, project_id, input, worker_id
`
type InsertProvisionerJobParams struct {
ID uuid.UUID `db:"id" json:"id"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
InitiatorID string `db:"initiator_id" json:"initiator_id"`
Provisioner ProvisionerType `db:"provisioner" json:"provisioner"`
Type ProvisionerJobType `db:"type" json:"type"`
ProjectID uuid.UUID `db:"project_id" json:"project_id"`
Input json.RawMessage `db:"input" json:"input"`
}
func (q *sqlQuerier) InsertProvisionerJob(ctx context.Context, arg InsertProvisionerJobParams) (ProvisionerJob, error) {
row := q.db.QueryRowContext(ctx, insertProvisionerJob,
arg.ID,
arg.CreatedAt,
arg.UpdatedAt,
arg.InitiatorID,
arg.Provisioner,
arg.Type,
arg.ProjectID,
arg.Input,
)
var i ProvisionerJob
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.StartedAt,
&i.CancelledAt,
&i.CompletedAt,
&i.Error,
&i.InitiatorID,
&i.Provisioner,
&i.Type,
&i.ProjectID,
&i.Input,
&i.WorkerID,
)
return i, err
}
const insertUser = `-- name: InsertUser :one
INSERT INTO
users (
@@ -1349,6 +1802,58 @@ func (q *sqlQuerier) UpdateAPIKeyByID(ctx context.Context, arg UpdateAPIKeyByIDP
return err
}
const updateProvisionerDaemonByID = `-- name: UpdateProvisionerDaemonByID :exec
UPDATE
provisioner_daemon
SET
updated_at = $2,
provisioners = $3
WHERE
id = $1
`
type UpdateProvisionerDaemonByIDParams struct {
ID uuid.UUID `db:"id" json:"id"`
UpdatedAt sql.NullTime `db:"updated_at" json:"updated_at"`
Provisioners []ProvisionerType `db:"provisioners" json:"provisioners"`
}
func (q *sqlQuerier) UpdateProvisionerDaemonByID(ctx context.Context, arg UpdateProvisionerDaemonByIDParams) error {
_, err := q.db.ExecContext(ctx, updateProvisionerDaemonByID, arg.ID, arg.UpdatedAt, pq.Array(arg.Provisioners))
return err
}
const updateProvisionerJobByID = `-- name: UpdateProvisionerJobByID :exec
UPDATE
provisioner_job
SET
updated_at = $2,
cancelled_at = $3,
completed_at = $4,
error = $5
WHERE
id = $1
`
type UpdateProvisionerJobByIDParams struct {
ID uuid.UUID `db:"id" json:"id"`
UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
CancelledAt sql.NullTime `db:"cancelled_at" json:"cancelled_at"`
CompletedAt sql.NullTime `db:"completed_at" json:"completed_at"`
Error sql.NullString `db:"error" json:"error"`
}
func (q *sqlQuerier) UpdateProvisionerJobByID(ctx context.Context, arg UpdateProvisionerJobByIDParams) error {
_, err := q.db.ExecContext(ctx, updateProvisionerJobByID,
arg.ID,
arg.UpdatedAt,
arg.CancelledAt,
arg.CompletedAt,
arg.Error,
)
return err
}
const updateWorkspaceHistoryByID = `-- name: UpdateWorkspaceHistoryByID :exec
UPDATE
workspace_history