Files
coder/database/query.sql
T
Kyle Carberry e75bde4e31 feat: Add provisionerdaemon to coderd (#141)
* feat: Add history middleware parameters

These will be used for streaming logs, checking status,
and other operations related to workspace and project
history.

* refactor: Move all HTTP routes to top-level struct

Nesting all structs behind their respective structures
is leaky, and promotes naming conflicts between handlers.

Our HTTP routes cannot have conflicts, so neither should
function naming.

* Add provisioner daemon routes

* Add periodic updates

* Skip pubsub if short

* Return jobs with WorkspaceHistory

* Add endpoints for extracting singular history

* The full end-to-end operation works

* fix: Disable compression for websocket dRPC transport (#145)

There is a race condition in the interop between the websocket and `dRPC`: https://github.com/coder/coder/runs/5038545709?check_suite_focus=true#step:7:117 - it seems both the websocket and dRPC feel like they own the `byte[]` being sent between them. This can lead to data races, in which both `dRPC` and the websocket are writing.

This is just tracking some experimentation to fix that race condition

## Run results: ##
- Run 1: peer test failure
- Run 2: peer test failure
- Run 3: `TestWorkspaceHistory/CreateHistory`  - https://github.com/coder/coder/runs/5040858460?check_suite_focus=true#step:8:45
```
status code 412: The provided project history is running. Wait for it to complete importing!`
```
- Run 4: `TestWorkspaceHistory/CreateHistory` - https://github.com/coder/coder/runs/5040957999?check_suite_focus=true#step:7:176
```
    workspacehistory_test.go:122: 
        	Error Trace:	workspacehistory_test.go:122
        	Error:      	Condition never satisfied
        	Test:       	TestWorkspaceHistory/CreateHistory
```
- Run 5: peer failure
- Run 6: Pass  
- Run 7: Peer failure

## Open Questions: ##

### Is `dRPC` or `websocket` at fault for the data race?

It looks like this condition is specifically happening when `dRPC` decides to [`SendError`]). This constructs a new byte payload from [`MarshalError`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/error.go#L15) - so `dRPC` has created this buffer and owns it.

From `dRPC`'s perspective, the callstack looks like this:
- [`sendPacket`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcstream/stream.go#L253)
  - [`writeFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L65)
    - [`AppendFrame`](https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/packet.go#L128)
      - with finally the data race happening here:
```go
// AppendFrame appends a marshaled form of the frame to the provided buffer.
func AppendFrame(buf []byte, fr Frame) []byte {
...
	out := buf
	out = append(out, control).   // <---------
```

This should be fine, since `dPRC` create this buffer, and is taking the byte buffer constructed from `MarshalError` and tacking a bunch of headers on it to create a proper frame.

Once `dRPC` is done writing, it _hangs onto the buffer and resets it here__: https://github.com/storj/drpc/blob/f6e369438f636b47ee788095d3fc13062ffbd019/drpcwire/writer.go#L73

However... the websocket implementation, once it gets the buffer, it runs a `statelessDeflate` [here](https://github.com/nhooyr/websocket/blob/8dee580a7f74cf1713400307b4eee514b927870f/write.go#L180), which compresses the buffer on the fly. This functionality actually [mutates the buffer in place](https://github.com/klauspost/compress/blob/a1a9cfc821f00faf2f5231beaa96244344d50391/flate/stateless.go#L94), which is where get our race.

In the case where the `byte[]` aren't being manipulated anywhere else, this compress-in-place operation would be safe, and that's probably the case for most over-the-wire usages. In this case, though, where we're plumbing `dRPC` -> websocket, they both are manipulating it (`dRPC` is reusing the buffer for the next `write`, and `websocket` is compressing on the fly).

### Why does cloning on `Read` fail?

Get a bunch of errors like:
```
2022/02/02 19:26:10 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [ERR] yamux: Failed to read header: unexpected EOF
2022/02/02 19:26:25 [WARN] yamux: frame for missing stream: Vsn:0 Type:0 Flags:0 StreamID:0 Length:0
```

# UPDATE:

We decided we could disable websocket compression, which would avoid the race because the in-place `deflate` operaton would no longer be run. Trying that out now:

- Run 1:  
- Run 2: https://github.com/coder/coder/runs/5042645522?check_suite_focus=true#step:8:338
- Run 3:  
- Run 4: https://github.com/coder/coder/runs/5042988758?check_suite_focus=true#step:7:168
- Run 5: 

* fix: Remove race condition with acquiredJobDone channel (#148)

Found another data race while running the tests: https://github.com/coder/coder/runs/5044320845?check_suite_focus=true#step:7:83

__Issue:__ There is a race in the p.acquiredJobDone chan - in particular, there can be a case where we're waiting on the channel to finish (in close) with <-p.acquiredJobDone, but in parallel, an acquireJob could've been started, which would create a new channel for p.acquiredJobDone. There is a similar race in `close(..)`ing the channel, which also came up in test runs.

__Fix:__ Instead of recreating the channel everytime, we can use `sync.WaitGroup` to accomplish the same functionality - a semaphore to make close wait for the current job to wrap up.

* fix: Bump up workspace history timeout (#149)

This is an attempted fix for failures like: https://github.com/coder/coder/runs/5043435263?check_suite_focus=true#step:7:32

Looking at the timing of the test:
```
    t.go:56: 2022-02-02 21:33:21.964 [DEBUG]	(terraform-provisioner)	<provision.go:139>	ran apply
    t.go:56: 2022-02-02 21:33:21.991 [DEBUG]	(provisionerd)	<provisionerd.go:162>	skipping acquire; job is already running
    t.go:56: 2022-02-02 21:33:22.050 [DEBUG]	(provisionerd)	<provisionerd.go:162>	skipping acquire; job is already running
    t.go:56: 2022-02-02 21:33:22.090 [DEBUG]	(provisionerd)	<provisionerd.go:162>	skipping acquire; job is already running
    t.go:56: 2022-02-02 21:33:22.140 [DEBUG]	(provisionerd)	<provisionerd.go:162>	skipping acquire; job is already running
    t.go:56: 2022-02-02 21:33:22.195 [DEBUG]	(provisionerd)	<provisionerd.go:162>	skipping acquire; job is already running
    t.go:56: 2022-02-02 21:33:22.240 [DEBUG]	(provisionerd)	<provisionerd.go:162>	skipping acquire; job is already running
    workspacehistory_test.go:122: 
        	Error Trace:	workspacehistory_test.go:122
        	Error:      	Condition never satisfied
        	Test:       	TestWorkspaceHistory/CreateHistory
```

It  appears that the `terraform apply` job had just finished - with less than a second to spare until our `require.Eventually` completes - but there's still work to be done (ie, collecting the state files). So my suspicion is that terraform might, in some cases, exceed our 5s timeout.

Note that in the setup for this test - there is a similar project history wait that waits for 15s, so I borrowed that here.

In the future - we can look at potentially using a simple echo provider to exercise this in the unit test, in a way that is more reliable in terms of timing. I'll log an issue to track that.

Co-authored-by: Bryan <bryan@coder.com>
2022-02-03 20:34:50 +00:00

626 lines
9.6 KiB
SQL

-- Database queries are generated using sqlc. See:
-- https://docs.sqlc.dev/en/latest/tutorials/getting-started-postgresql.html
--
-- Run "make gen" to generate models and query functions.
;
-- Acquires the lock for a single job that isn't started, completed,
-- cancelled, and that matches an array of provisioner types.
--
-- SKIP LOCKED is used to jump over locked rows. This prevents
-- multiple provisioners from acquiring the same jobs. See:
-- https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
-- name: AcquireProvisionerJob :one
UPDATE
provisioner_job
SET
started_at = @started_at,
updated_at = @started_at,
worker_id = @worker_id
WHERE
id = (
SELECT
id
FROM
provisioner_job AS nested
WHERE
nested.started_at IS NULL
AND nested.cancelled_at IS NULL
AND nested.completed_at IS NULL
AND nested.provisioner = ANY(@types :: provisioner_type [ ])
ORDER BY
nested.created_at FOR
UPDATE
SKIP LOCKED
LIMIT
1
) RETURNING *;
-- name: GetAPIKeyByID :one
SELECT
*
FROM
api_keys
WHERE
id = $1
LIMIT
1;
-- name: GetUserByID :one
SELECT
*
FROM
users
WHERE
id = $1
LIMIT
1;
-- name: GetUserByEmailOrUsername :one
SELECT
*
FROM
users
WHERE
LOWER(username) = LOWER(@username)
OR email = @email
LIMIT
1;
-- name: GetUserCount :one
SELECT
COUNT(*)
FROM
users;
-- name: GetOrganizationByID :one
SELECT
*
FROM
organizations
WHERE
id = $1;
-- name: GetOrganizationByName :one
SELECT
*
FROM
organizations
WHERE
LOWER(name) = LOWER(@name)
LIMIT
1;
-- name: GetOrganizationsByUserID :many
SELECT
*
FROM
organizations
WHERE
id = (
SELECT
organization_id
FROM
organization_members
WHERE
user_id = $1
);
-- name: GetOrganizationMemberByUserID :one
SELECT
*
FROM
organization_members
WHERE
organization_id = $1
AND user_id = $2
LIMIT
1;
-- name: GetParameterValuesByScope :many
SELECT
*
FROM
parameter_value
WHERE
scope = $1
AND scope_id = $2;
-- name: GetProjectByID :one
SELECT
*
FROM
project
WHERE
id = $1
LIMIT
1;
-- name: GetProjectByOrganizationAndName :one
SELECT
*
FROM
project
WHERE
organization_id = @organization_id
AND LOWER(name) = LOWER(@name)
LIMIT
1;
-- name: GetProjectsByOrganizationIDs :many
SELECT
*
FROM
project
WHERE
organization_id = ANY(@ids :: text [ ]);
-- name: GetProjectParametersByHistoryID :many
SELECT
*
FROM
project_parameter
WHERE
project_history_id = $1;
-- name: GetProjectHistoryByProjectID :many
SELECT
*
FROM
project_history
WHERE
project_id = $1;
-- name: GetProjectHistoryByProjectIDAndName :one
SELECT
*
FROM
project_history
WHERE
project_id = $1
AND name = $2;
-- name: GetProjectHistoryByID :one
SELECT
*
FROM
project_history
WHERE
id = $1;
-- name: GetProjectHistoryLogsByIDBefore :many
SELECT
*
FROM
project_history_log
WHERE
project_history_id = $1
AND created_at <= $2
ORDER BY
created_at;
-- name: GetProvisionerDaemons :many
SELECT
*
FROM
provisioner_daemon;
-- name: GetProvisionerDaemonByID :one
SELECT
*
FROM
provisioner_daemon
WHERE
id = $1;
-- name: GetProvisionerJobByID :one
SELECT
*
FROM
provisioner_job
WHERE
id = $1;
-- name: GetWorkspaceByID :one
SELECT
*
FROM
workspace
WHERE
id = $1
LIMIT
1;
-- name: GetWorkspacesByUserID :many
SELECT
*
FROM
workspace
WHERE
owner_id = $1;
-- name: GetWorkspaceByUserIDAndName :one
SELECT
*
FROM
workspace
WHERE
owner_id = @owner_id
AND LOWER(name) = LOWER(@name);
-- name: GetWorkspacesByProjectAndUserID :many
SELECT
*
FROM
workspace
WHERE
owner_id = $1
AND project_id = $2;
-- name: GetWorkspaceHistoryByID :one
SELECT
*
FROM
workspace_history
WHERE
id = $1
LIMIT
1;
-- name: GetWorkspaceHistoryByWorkspaceIDAndName :one
SELECT
*
FROM
workspace_history
WHERE
workspace_id = $1
AND name = $2;
-- name: GetWorkspaceHistoryByWorkspaceID :many
SELECT
*
FROM
workspace_history
WHERE
workspace_id = $1;
-- name: GetWorkspaceHistoryByWorkspaceIDWithoutAfter :one
SELECT
*
FROM
workspace_history
WHERE
workspace_id = $1
AND after_id IS NULL
LIMIT
1;
-- name: GetWorkspaceHistoryLogsByIDBefore :many
SELECT
*
FROM
workspace_history_log
WHERE
workspace_history_id = $1
AND created_at <= $2
ORDER BY
created_at;
-- name: GetWorkspaceResourcesByHistoryID :many
SELECT
*
FROM
workspace_resource
WHERE
workspace_history_id = $1;
-- name: GetWorkspaceAgentsByResourceIDs :many
SELECT
*
FROM
workspace_agent
WHERE
workspace_resource_id = ANY(@ids :: uuid [ ]);
-- name: InsertAPIKey :one
INSERT INTO
api_keys (
id,
hashed_secret,
user_id,
application,
name,
last_used,
expires_at,
created_at,
updated_at,
login_type,
oidc_access_token,
oidc_refresh_token,
oidc_id_token,
oidc_expiry,
devurl_token
)
VALUES
(
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9,
$10,
$11,
$12,
$13,
$14,
$15
) RETURNING *;
-- name: InsertOrganization :one
INSERT INTO
organizations (id, name, description, created_at, updated_at)
VALUES
($1, $2, $3, $4, $5) RETURNING *;
-- name: InsertOrganizationMember :one
INSERT INTO
organization_members (
organization_id,
user_id,
created_at,
updated_at,
roles
)
VALUES
($1, $2, $3, $4, $5) RETURNING *;
-- name: InsertParameterValue :one
INSERT INTO
parameter_value (
id,
name,
created_at,
updated_at,
scope,
scope_id,
source_scheme,
source_value,
destination_scheme,
destination_value
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING *;
-- name: InsertProject :one
INSERT INTO
project (
id,
created_at,
updated_at,
organization_id,
name,
provisioner
)
VALUES
($1, $2, $3, $4, $5, $6) RETURNING *;
-- name: InsertProjectHistory :one
INSERT INTO
project_history (
id,
project_id,
created_at,
updated_at,
name,
description,
storage_method,
storage_source,
import_job_id
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *;
-- name: InsertProjectHistoryLogs :many
INSERT INTO
project_history_log
SELECT
@project_history_id :: uuid AS project_history_id,
unnest(@id :: uuid [ ]) AS id,
unnest(@created_at :: timestamptz [ ]) AS created_at,
unnest(@source :: log_source [ ]) as source,
unnest(@level :: log_level [ ]) as level,
unnest(@output :: varchar(1024) [ ]) as output RETURNING *;
-- name: InsertProjectParameter :one
INSERT INTO
project_parameter (
id,
created_at,
project_history_id,
name,
description,
default_source_scheme,
default_source_value,
allow_override_source,
default_destination_scheme,
default_destination_value,
allow_override_destination,
default_refresh,
redisplay_value,
validation_error,
validation_condition,
validation_type_system,
validation_value_type
)
VALUES
(
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9,
$10,
$11,
$12,
$13,
$14,
$15,
$16,
$17
) RETURNING *;
-- name: InsertProvisionerDaemon :one
INSERT INTO
provisioner_daemon (id, created_at, name, provisioners)
VALUES
($1, $2, $3, $4) RETURNING *;
-- name: InsertProvisionerJob :one
INSERT INTO
provisioner_job (
id,
created_at,
updated_at,
initiator_id,
provisioner,
type,
project_id,
input
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8) RETURNING *;
-- name: InsertUser :one
INSERT INTO
users (
id,
email,
name,
login_type,
revoked,
hashed_password,
created_at,
updated_at,
username
)
VALUES
($1, $2, $3, $4, false, $5, $6, $7, $8) RETURNING *;
-- name: InsertWorkspace :one
INSERT INTO
workspace (
id,
created_at,
updated_at,
owner_id,
project_id,
name
)
VALUES
($1, $2, $3, $4, $5, $6) RETURNING *;
-- name: InsertWorkspaceAgent :one
INSERT INTO
workspace_agent (
id,
workspace_resource_id,
created_at,
updated_at,
instance_metadata,
resource_metadata
)
VALUES
($1, $2, $3, $4, $5, $6) RETURNING *;
-- name: InsertWorkspaceHistory :one
INSERT INTO
workspace_history (
id,
created_at,
updated_at,
workspace_id,
project_history_id,
before_id,
name,
transition,
initiator,
provision_job_id,
provisioner_state
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *;
-- name: InsertWorkspaceHistoryLogs :many
INSERT INTO
workspace_history_log
SELECT
unnest(@id :: uuid [ ]) AS id,
@workspace_history_id :: uuid AS workspace_history_id,
unnest(@created_at :: timestamptz [ ]) AS created_at,
unnest(@source :: log_source [ ]) as source,
unnest(@level :: log_level [ ]) as level,
unnest(@output :: varchar(1024) [ ]) as output RETURNING *;
-- name: InsertWorkspaceResource :one
INSERT INTO
workspace_resource (
id,
created_at,
workspace_history_id,
type,
name,
workspace_agent_token
)
VALUES
($1, $2, $3, $4, $5, $6) RETURNING *;
-- name: UpdateAPIKeyByID :exec
UPDATE
api_keys
SET
last_used = $2,
expires_at = $3,
oidc_access_token = $4,
oidc_refresh_token = $5,
oidc_expiry = $6
WHERE
id = $1;
-- name: UpdateProvisionerDaemonByID :exec
UPDATE
provisioner_daemon
SET
updated_at = $2,
provisioners = $3
WHERE
id = $1;
-- name: UpdateProvisionerJobByID :exec
UPDATE
provisioner_job
SET
updated_at = $2,
cancelled_at = $3,
completed_at = $4,
error = $5
WHERE
id = $1;
-- name: UpdateWorkspaceHistoryByID :exec
UPDATE
workspace_history
SET
updated_at = $2,
after_id = $3,
provisioner_state = $4
WHERE
id = $1;