fix: make prebuild claiming durable and idempotent (#23108)

## Problem

When a prebuilt workspace is claimed, the agent reinitializes via a
single fire-and-forget pubsub event over SSE. If the agent's SSE
connection is interrupted at claim time, the event is permanently lost —
the workspace is stuck with no self-healing path.

Additionally, regular (non-prebuild) workspaces had no way to opt out of
the `/reinit` polling loop — agents would reconnect indefinitely to an
endpoint that would never send them anything useful.

## Root Cause

`workspaceAgentReinit` fetches the workspace (with its current
`owner_id`) via `GetWorkspaceByAgentID`, but never checked whether a
claim already happened. It only subscribed to pubsub for future events.
The database already has durable claim state (`owner_id` changes from
`PrebuildsSystemUserID` to the real user), but no layer ever consulted
it on reconnection.

## Solution

### Server-side durable check with first-build-initiator gating

**TOCTOU-safe ordering**: Subscribe to pubsub claim events *before* any
durable checks, so a claim that fires during the check is buffered in
the channel rather than lost.

**First-build-initiator gating**: When `!workspace.IsPrebuild()` (owner
is no longer the system user), look up the first build's `InitiatorID`.
The prebuild reconciler always uses `PrebuildsSystemUserID` as the
initiator. This distinguishes claimed prebuilds from regular workspaces
without any SQL schema changes.

- **Regular workspace** (first build initiator ≠ system user) → **409
Conflict**, agent stops reconnecting
- **Claimed prebuild, build completed** → pre-seed channel with reinit
event and close it, transmitter delivers one-shot then exits
- **Claimed prebuild, build in-progress** → fall through to pubsub
subscription, agent waits for completion event
- **Unclaimed prebuild** → pubsub subscription (existing happy path)

### Declarative reinit events (defense-in-depth)

- Added `UserID` field to `ReinitializationEvent` with JSON tags
- Switched pubsub serialization from raw string to JSON (with
backward-compat fallback for rolling upgrades)
- Populated `UserID` at both the publish site and the durable check

### Agent SDK: 409 handling

`WaitForReinitLoop` detects 409 Conflict from the server and closes the
`reinitEvents` channel, cleanly exiting the retry goroutine.

### Agent CLI: fixed two bugs + added reinitCtx

- **Closed channel (`!ok`)**: now blocks on `<-ctx.Done()` instead of
`continue`, keeping the current agent running. Previously this would
leak agents by skipping `agnt.Close()` and re-entering the loop.
- **Duplicate owner reinit**: cancels `reinitCtx` (stops the reinit
goroutine), then blocks on `<-ctx.Done()`. Previously `continue` would
skip cleanup and create a new agent on the next loop iteration.
- **`reinitCtx`**: a cancellable child of `ctx` passed to
`WaitForReinitLoop`, allowing the agent to stop the reinit HTTP polling
after reinit completes.

### Agent-side idempotency

Tracks `lastOwnerID` in the agent reinit loop — duplicate events for the
same owner are skipped.

## Testing

- **"unclaimed prebuild receives reinit via pubsub"**: prebuild owned by
system user, pubsub event triggers reinit
- **"claimed prebuild receives one-shot reinit on reconnect"**: first
build by system user, owner changed, build completed → immediate reinit
(no pubsub needed)
- **"claimed prebuild waits during in-progress claim build"**: claimed
but build still running → no reinit until build completes
- **"regular workspace gets 409"**: first build by real user → 409
Conflict, agent stops polling
- Updated claim publisher/listener tests: verify `UserID` survives JSON
round-trip + backward compat with raw string payloads
- Updated SSE round-trip test: verify `UserID` survives transmit →
receive cycle

Fixes #22359

## Rolling upgrade note

During a rolling deploy where old coderd instances coexist with new
ones, the pubsub `ReinitializationEvent` has a new `workspace_id` field
(JSON key `workspace_id`). Old publishers send a raw reason string
instead of JSON; the new listener gracefully falls back by treating the
entire payload as the reason and filling in `WorkspaceID` from context.
The only visible effect during the upgrade window is that `WorkspaceID`
may be the zero UUID in agent-side logs — this is cosmetic and resolves
once all instances are updated.
This commit is contained in:
Sas Swart
2026-04-02 23:51:02 +02:00
committed by GitHub
parent 990c006f28
commit 5b6b7719df
13 changed files with 446 additions and 90 deletions
+33 -6
View File
@@ -17,6 +17,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"gopkg.in/natefinch/lumberjack.v2" "gopkg.in/natefinch/lumberjack.v2"
@@ -272,11 +273,14 @@ func workspaceAgent() *serpent.Command {
logger.Info(ctx, "agent devcontainer detection not enabled") logger.Info(ctx, "agent devcontainer detection not enabled")
} }
reinitEvents := agentsdk.WaitForReinitLoop(ctx, logger, client) reinitCtx, reinitCancel := context.WithCancel(ctx)
defer reinitCancel()
reinitEvents := agentsdk.WaitForReinitLoop(reinitCtx, logger, client)
var ( var (
lastErr error lastOwnerID uuid.UUID
mustExit bool lastErr error
mustExit bool
) )
for { for {
prometheusRegistry := prometheus.NewRegistry() prometheusRegistry := prometheus.NewRegistry()
@@ -343,9 +347,32 @@ func workspaceAgent() *serpent.Command {
case <-ctx.Done(): case <-ctx.Done():
logger.Info(ctx, "agent shutting down", slog.Error(context.Cause(ctx))) logger.Info(ctx, "agent shutting down", slog.Error(context.Cause(ctx)))
mustExit = true mustExit = true
case event := <-reinitEvents: case event, ok := <-reinitEvents:
logger.Info(ctx, "agent received instruction to reinitialize", switch {
slog.F("workspace_id", event.WorkspaceID), slog.F("reason", event.Reason)) case !ok:
// Channel closed — the reinit loop exited
// (terminal 409 or context expired). Keep
// running the current agent until the parent
// context is canceled.
logger.Info(ctx, "reinit channel closed, running without reinit capability")
reinitEvents = nil
<-ctx.Done()
mustExit = true
case event.OwnerID != uuid.Nil && event.OwnerID == lastOwnerID:
// Duplicate reinit for same owner — already
// reinitialized. Cancel the reinit loop
// goroutine and keep the current agent.
logger.Info(ctx, "skipping redundant reinit, owner unchanged",
slog.F("owner_id", event.OwnerID))
reinitCancel()
reinitEvents = nil
<-ctx.Done()
mustExit = true
default:
lastOwnerID = event.OwnerID
logger.Info(ctx, "agent received instruction to reinitialize",
slog.F("workspace_id", event.WorkspaceID), slog.F("reason", event.Reason))
}
} }
lastErr = agnt.Close() lastErr = agnt.Close()
+21 -2
View File
@@ -10205,12 +10205,26 @@ const docTemplate = `{
], ],
"summary": "Get workspace agent reinitialization", "summary": "Get workspace agent reinitialization",
"operationId": "get-workspace-agent-reinitialization", "operationId": "get-workspace-agent-reinitialization",
"parameters": [
{
"type": "boolean",
"description": "Opt in to durable reinit checks",
"name": "wait",
"in": "query"
}
],
"responses": { "responses": {
"200": { "200": {
"description": "OK", "description": "OK",
"schema": { "schema": {
"$ref": "#/definitions/agentsdk.ReinitializationEvent" "$ref": "#/definitions/agentsdk.ReinitializationEvent"
} }
},
"409": {
"description": "Conflict",
"schema": {
"$ref": "#/definitions/codersdk.Response"
}
} }
}, },
"security": [ "security": [
@@ -12647,11 +12661,16 @@ const docTemplate = `{
"agentsdk.ReinitializationEvent": { "agentsdk.ReinitializationEvent": {
"type": "object", "type": "object",
"properties": { "properties": {
"owner_id": {
"type": "string",
"format": "uuid"
},
"reason": { "reason": {
"$ref": "#/definitions/agentsdk.ReinitializationReason" "$ref": "#/definitions/agentsdk.ReinitializationReason"
}, },
"workspaceID": { "workspace_id": {
"type": "string" "type": "string",
"format": "uuid"
} }
} }
}, },
+21 -2
View File
@@ -9038,12 +9038,26 @@
"tags": ["Agents"], "tags": ["Agents"],
"summary": "Get workspace agent reinitialization", "summary": "Get workspace agent reinitialization",
"operationId": "get-workspace-agent-reinitialization", "operationId": "get-workspace-agent-reinitialization",
"parameters": [
{
"type": "boolean",
"description": "Opt in to durable reinit checks",
"name": "wait",
"in": "query"
}
],
"responses": { "responses": {
"200": { "200": {
"description": "OK", "description": "OK",
"schema": { "schema": {
"$ref": "#/definitions/agentsdk.ReinitializationEvent" "$ref": "#/definitions/agentsdk.ReinitializationEvent"
} }
},
"409": {
"description": "Conflict",
"schema": {
"$ref": "#/definitions/codersdk.Response"
}
} }
}, },
"security": [ "security": [
@@ -11229,11 +11243,16 @@
"agentsdk.ReinitializationEvent": { "agentsdk.ReinitializationEvent": {
"type": "object", "type": "object",
"properties": { "properties": {
"owner_id": {
"type": "string",
"format": "uuid"
},
"reason": { "reason": {
"$ref": "#/definitions/agentsdk.ReinitializationReason" "$ref": "#/definitions/agentsdk.ReinitializationReason"
}, },
"workspaceID": { "workspace_id": {
"type": "string" "type": "string",
"format": "uuid"
} }
} }
}, },
+29 -16
View File
@@ -2,6 +2,7 @@ package prebuilds
import ( import (
"context" "context"
"encoding/json"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
@@ -22,7 +23,11 @@ type PubsubWorkspaceClaimPublisher struct {
func (p PubsubWorkspaceClaimPublisher) PublishWorkspaceClaim(claim agentsdk.ReinitializationEvent) error { func (p PubsubWorkspaceClaimPublisher) PublishWorkspaceClaim(claim agentsdk.ReinitializationEvent) error {
channel := agentsdk.PrebuildClaimedChannel(claim.WorkspaceID) channel := agentsdk.PrebuildClaimedChannel(claim.WorkspaceID)
if err := p.ps.Publish(channel, []byte(claim.Reason)); err != nil { payload, err := json.Marshal(claim)
if err != nil {
return xerrors.Errorf("marshal claim event: %w", err)
}
if err := p.ps.Publish(channel, payload); err != nil {
return xerrors.Errorf("failed to trigger prebuilt workspace agent reinitialization: %w", err) return xerrors.Errorf("failed to trigger prebuilt workspace agent reinitialization: %w", err)
} }
return nil return nil
@@ -37,33 +42,41 @@ type PubsubWorkspaceClaimListener struct {
ps pubsub.Pubsub ps pubsub.Pubsub
} }
// ListenForWorkspaceClaims subscribes to a pubsub channel and sends any received events on the chan that it returns. // ListenForWorkspaceClaims subscribes to a pubsub channel and returns a
// pubsub.Pubsub does not communicate when its last callback has been called after it has been closed. As such the chan // receive-only channel that emits claim events for the given workspace.
// returned by this method is never closed. Call the returned cancel() function to close the subscription when it is no longer needed. // The returned channel is owned by this function and is never closed,
// cancel() will be called if ctx expires or is canceled. // because pubsub.Pubsub does not guarantee that all in-flight callbacks
func (p PubsubWorkspaceClaimListener) ListenForWorkspaceClaims(ctx context.Context, workspaceID uuid.UUID, reinitEvents chan<- agentsdk.ReinitializationEvent) (func(), error) { // have returned after unsubscribe. Call the returned cancel function to
// unsubscribe when events are no longer needed; cancel is also called
// automatically if ctx expires or is canceled.
func (p PubsubWorkspaceClaimListener) ListenForWorkspaceClaims(ctx context.Context, workspaceID uuid.UUID) (<-chan agentsdk.ReinitializationEvent, func(), error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return func() {}, ctx.Err() return nil, func() {}, ctx.Err()
default: default:
} }
cancelSub, err := p.ps.Subscribe(agentsdk.PrebuildClaimedChannel(workspaceID), func(inner context.Context, reason []byte) { reinitEvents := make(chan agentsdk.ReinitializationEvent, 1)
claim := agentsdk.ReinitializationEvent{
WorkspaceID: workspaceID, cancelSub, err := p.ps.Subscribe(agentsdk.PrebuildClaimedChannel(workspaceID), func(inner context.Context, payload []byte) {
Reason: agentsdk.ReinitializationReason(reason), var event agentsdk.ReinitializationEvent
if err := json.Unmarshal(payload, &event); err != nil {
// Rolling upgrade: old publishers send the raw reason
// string instead of JSON.
event = agentsdk.ReinitializationEvent{
WorkspaceID: workspaceID,
Reason: agentsdk.ReinitializationReason(payload),
}
} }
select { select {
case <-ctx.Done(): case <-ctx.Done():
return
case <-inner.Done(): case <-inner.Done():
return case reinitEvents <- event:
case reinitEvents <- claim:
} }
}) })
if err != nil { if err != nil {
return func() {}, xerrors.Errorf("failed to subscribe to prebuild claimed channel: %w", err) return nil, func() {}, xerrors.Errorf("failed to subscribe to prebuild claimed channel: %w", err)
} }
var once sync.Once var once sync.Once
@@ -78,5 +91,5 @@ func (p PubsubWorkspaceClaimListener) ListenForWorkspaceClaims(ctx context.Conte
cancel() cancel()
}() }()
return cancel, nil return reinitEvents, cancel, nil
} }
+11 -12
View File
@@ -25,24 +25,26 @@ func TestPubsubWorkspaceClaimPublisher(t *testing.T) {
logger := testutil.Logger(t) logger := testutil.Logger(t)
ps := pubsub.NewInMemory() ps := pubsub.NewInMemory()
workspaceID := uuid.New() workspaceID := uuid.New()
reinitEvents := make(chan agentsdk.ReinitializationEvent, 1)
publisher := prebuilds.NewPubsubWorkspaceClaimPublisher(ps) publisher := prebuilds.NewPubsubWorkspaceClaimPublisher(ps)
listener := prebuilds.NewPubsubWorkspaceClaimListener(ps, logger) listener := prebuilds.NewPubsubWorkspaceClaimListener(ps, logger)
cancel, err := listener.ListenForWorkspaceClaims(ctx, workspaceID, reinitEvents) events, cancel, err := listener.ListenForWorkspaceClaims(ctx, workspaceID)
require.NoError(t, err) require.NoError(t, err)
defer cancel() defer cancel()
userID := uuid.New()
claim := agentsdk.ReinitializationEvent{ claim := agentsdk.ReinitializationEvent{
WorkspaceID: workspaceID, WorkspaceID: workspaceID,
Reason: agentsdk.ReinitializeReasonPrebuildClaimed, Reason: agentsdk.ReinitializeReasonPrebuildClaimed,
OwnerID: userID,
} }
err = publisher.PublishWorkspaceClaim(claim) err = publisher.PublishWorkspaceClaim(claim)
require.NoError(t, err) require.NoError(t, err)
gotEvent := testutil.RequireReceive(ctx, t, reinitEvents) gotEvent := testutil.RequireReceive(ctx, t, events)
require.Equal(t, workspaceID, gotEvent.WorkspaceID) require.Equal(t, workspaceID, gotEvent.WorkspaceID)
require.Equal(t, claim.Reason, gotEvent.Reason) require.Equal(t, claim.Reason, gotEvent.Reason)
require.Equal(t, userID, gotEvent.OwnerID)
}) })
t.Run("fail to publish claim", func(t *testing.T) { t.Run("fail to publish claim", func(t *testing.T) {
@@ -69,10 +71,8 @@ func TestPubsubWorkspaceClaimListener(t *testing.T) {
ps := pubsub.NewInMemory() ps := pubsub.NewInMemory()
listener := prebuilds.NewPubsubWorkspaceClaimListener(ps, slogtest.Make(t, nil)) listener := prebuilds.NewPubsubWorkspaceClaimListener(ps, slogtest.Make(t, nil))
claims := make(chan agentsdk.ReinitializationEvent, 1) // Buffer to avoid messing with goroutines in the rest of the test
workspaceID := uuid.New() workspaceID := uuid.New()
cancelFunc, err := listener.ListenForWorkspaceClaims(context.Background(), workspaceID, claims) events, cancelFunc, err := listener.ListenForWorkspaceClaims(context.Background(), workspaceID)
require.NoError(t, err) require.NoError(t, err)
defer cancelFunc() defer cancelFunc()
@@ -84,9 +84,10 @@ func TestPubsubWorkspaceClaimListener(t *testing.T) {
// Verify we receive the claim // Verify we receive the claim
ctx := testutil.Context(t, testutil.WaitShort) ctx := testutil.Context(t, testutil.WaitShort)
claim := testutil.RequireReceive(ctx, t, claims) claim := testutil.RequireReceive(ctx, t, events)
require.Equal(t, workspaceID, claim.WorkspaceID) require.Equal(t, workspaceID, claim.WorkspaceID)
require.Equal(t, reason, claim.Reason) require.Equal(t, reason, claim.Reason)
require.Equal(t, uuid.Nil, claim.OwnerID)
}) })
t.Run("ignores claim events for other workspaces", func(t *testing.T) { t.Run("ignores claim events for other workspaces", func(t *testing.T) {
@@ -95,10 +96,9 @@ func TestPubsubWorkspaceClaimListener(t *testing.T) {
ps := pubsub.NewInMemory() ps := pubsub.NewInMemory()
listener := prebuilds.NewPubsubWorkspaceClaimListener(ps, slogtest.Make(t, nil)) listener := prebuilds.NewPubsubWorkspaceClaimListener(ps, slogtest.Make(t, nil))
claims := make(chan agentsdk.ReinitializationEvent)
workspaceID := uuid.New() workspaceID := uuid.New()
otherWorkspaceID := uuid.New() otherWorkspaceID := uuid.New()
cancelFunc, err := listener.ListenForWorkspaceClaims(context.Background(), workspaceID, claims) events, cancelFunc, err := listener.ListenForWorkspaceClaims(context.Background(), workspaceID)
require.NoError(t, err) require.NoError(t, err)
defer cancelFunc() defer cancelFunc()
@@ -109,7 +109,7 @@ func TestPubsubWorkspaceClaimListener(t *testing.T) {
// Verify we don't receive the claim // Verify we don't receive the claim
select { select {
case <-claims: case <-events:
t.Fatal("received claim for wrong workspace") t.Fatal("received claim for wrong workspace")
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
// Expected - no claim received // Expected - no claim received
@@ -119,11 +119,10 @@ func TestPubsubWorkspaceClaimListener(t *testing.T) {
t.Run("communicates the error if it can't subscribe", func(t *testing.T) { t.Run("communicates the error if it can't subscribe", func(t *testing.T) {
t.Parallel() t.Parallel()
claims := make(chan agentsdk.ReinitializationEvent)
ps := &brokenPubsub{} ps := &brokenPubsub{}
listener := prebuilds.NewPubsubWorkspaceClaimListener(ps, slogtest.Make(t, nil)) listener := prebuilds.NewPubsubWorkspaceClaimListener(ps, slogtest.Make(t, nil))
_, err := listener.ListenForWorkspaceClaims(context.Background(), uuid.New(), claims) _, _, err := listener.ListenForWorkspaceClaims(context.Background(), uuid.New())
require.ErrorContains(t, err, "failed to subscribe to prebuild claimed channel") require.ErrorContains(t, err, "failed to subscribe to prebuild claimed channel")
}) })
} }
@@ -2539,6 +2539,7 @@ func (s *server) completeWorkspaceBuildJob(ctx context.Context, job database.Pro
err = prebuilds.NewPubsubWorkspaceClaimPublisher(s.Pubsub).PublishWorkspaceClaim(agentsdk.ReinitializationEvent{ err = prebuilds.NewPubsubWorkspaceClaimPublisher(s.Pubsub).PublishWorkspaceClaim(agentsdk.ReinitializationEvent{
WorkspaceID: workspace.ID, WorkspaceID: workspace.ID,
Reason: agentsdk.ReinitializeReasonPrebuildClaimed, Reason: agentsdk.ReinitializeReasonPrebuildClaimed,
OwnerID: workspace.OwnerID,
}) })
if err != nil { if err != nil {
s.Logger.Error(ctx, "failed to publish workspace claim event", slog.Error(err)) s.Logger.Error(ctx, "failed to publish workspace claim event", slog.Error(err))
@@ -51,7 +51,6 @@ import (
"github.com/coder/coder/v2/coderd/usage/usagetypes" "github.com/coder/coder/v2/coderd/usage/usagetypes"
"github.com/coder/coder/v2/coderd/wspubsub" "github.com/coder/coder/v2/coderd/wspubsub"
"github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/provisionerd/proto" "github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionersdk" "github.com/coder/coder/v2/provisionersdk"
sdkproto "github.com/coder/coder/v2/provisionersdk/proto" sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
@@ -2787,8 +2786,7 @@ func TestCompleteJob(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// GIVEN something is listening to process workspace reinitialization: // GIVEN something is listening to process workspace reinitialization:
reinitChan := make(chan agentsdk.ReinitializationEvent, 1) // Buffered to simplify test structure reinitChan, cancel, err := agplprebuilds.NewPubsubWorkspaceClaimListener(ps, testutil.Logger(t)).ListenForWorkspaceClaims(ctx, workspace.ID)
cancel, err := agplprebuilds.NewPubsubWorkspaceClaimListener(ps, testutil.Logger(t)).ListenForWorkspaceClaims(ctx, workspace.ID, reinitChan)
require.NoError(t, err) require.NoError(t, err)
defer cancel() defer cancel()
+100 -3
View File
@@ -1465,7 +1465,9 @@ func (api *API) workspaceAgentPostLogSource(rw http.ResponseWriter, r *http.Requ
// @Security CoderSessionToken // @Security CoderSessionToken
// @Produce json // @Produce json
// @Tags Agents // @Tags Agents
// @Param wait query bool false "Opt in to durable reinit checks"
// @Success 200 {object} agentsdk.ReinitializationEvent // @Success 200 {object} agentsdk.ReinitializationEvent
// @Failure 409 {object} codersdk.Response
// @Router /workspaceagents/me/reinit [get] // @Router /workspaceagents/me/reinit [get]
func (api *API) workspaceAgentReinit(rw http.ResponseWriter, r *http.Request) { func (api *API) workspaceAgentReinit(rw http.ResponseWriter, r *http.Request) {
// Allow us to interrupt watch via cancel. // Allow us to interrupt watch via cancel.
@@ -1482,18 +1484,113 @@ func (api *API) workspaceAgentReinit(rw http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
log.Error(ctx, "failed to retrieve workspace from agent token", slog.Error(err)) log.Error(ctx, "failed to retrieve workspace from agent token", slog.Error(err))
httpapi.InternalServerError(rw, xerrors.New("failed to determine workspace from agent token")) httpapi.InternalServerError(rw, xerrors.New("failed to determine workspace from agent token"))
return
} }
log = log.With(slog.F("workspace_id", workspace.ID))
log.Info(ctx, "agent waiting for reinit instruction") log.Info(ctx, "agent waiting for reinit instruction")
reinitEvents := make(chan agentsdk.ReinitializationEvent) // Subscribe to claim events BEFORE any durable checks to avoid a
cancel, err = prebuilds.NewPubsubWorkspaceClaimListener(api.Pubsub, log).ListenForWorkspaceClaims(ctx, workspace.ID, reinitEvents) // TOCTOU race: without this, a claim could fire between the
// IsPrebuild() check and the subscribe call, and we'd miss the
// pubsub event entirely. By subscribing first, any event that
// fires during the checks below is buffered in the channel.
pubsubCh, cancelSub, err := prebuilds.NewPubsubWorkspaceClaimListener(api.Pubsub, log).ListenForWorkspaceClaims(ctx, workspace.ID)
if err != nil { if err != nil {
log.Error(ctx, "subscribe to prebuild claimed channel", slog.Error(err)) log.Error(ctx, "subscribe to prebuild claimed channel", slog.Error(err))
httpapi.InternalServerError(rw, xerrors.New("failed to subscribe to prebuild claimed channel")) httpapi.InternalServerError(rw, xerrors.New("failed to subscribe to prebuild claimed channel"))
return return
} }
defer cancel() defer cancelSub()
reinitEvents := pubsubCh
// Only perform the durable claim check when the agent opts in via
// the "wait" query parameter. Older agents don't send the
// "wait" query parameter and lack the duplicate-reinit guard, so
// they would enter an infinite reinit loop if we pre-seeded the
// channel on every connection.
waitParam, _ := strconv.ParseBool(r.URL.Query().Get("wait"))
if waitParam && !workspace.IsPrebuild() {
firstBuild, err := api.Database.GetWorkspaceBuildByWorkspaceIDAndBuildNumber(ctx,
database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams{
WorkspaceID: workspace.ID,
BuildNumber: 1,
})
if err != nil {
log.Error(ctx, "failed to get first workspace build", slog.Error(err))
httpapi.InternalServerError(rw, xerrors.New("failed to get first workspace build"))
return
}
if firstBuild.InitiatorID != database.PrebuildsSystemUserID {
// Not a claimed prebuild — this is a regular workspace.
// Return 409 so the agent stops reconnecting to this
// endpoint.
httpapi.Write(ctx, rw, http.StatusConflict, codersdk.Response{
Message: "Workspace is not a prebuilt workspace waiting to be claimed.",
Detail: "This endpoint is only for agents running in prebuilt workspaces.",
})
return
}
// This workspace was a prebuild that got claimed. Check if
// the claim build completed successfully before sending
// reinit. We assume the latest build is the claim build
// (build 2). If a third build (e.g. a restart) starts
// between the claim and the agent's reconnection, this
// would check that build instead. The window is extremely
// small in practice, and a restart would trigger its own
// reinit path.
latestBuild, err := api.Database.GetLatestWorkspaceBuildByWorkspaceID(ctx, workspace.ID)
if err != nil {
log.Error(ctx, "failed to get latest workspace build", slog.Error(err))
httpapi.InternalServerError(rw, xerrors.New("failed to get latest workspace build"))
return
}
job, err := api.Database.GetProvisionerJobByID(ctx, latestBuild.JobID)
if err != nil {
log.Error(ctx, "failed to get provisioner job", slog.Error(err))
httpapi.InternalServerError(rw, xerrors.New("failed to get provisioner job"))
return
}
if job.CompletedAt.Valid && !job.Error.Valid {
// Claim build succeeded — cancel the pubsub
// subscription (no longer needed) and swap in a
// pre-seeded channel so the transmitter delivers
// exactly one reinit event.
cancelSub()
seeded := make(chan agentsdk.ReinitializationEvent, 1)
seeded <- agentsdk.ReinitializationEvent{
WorkspaceID: workspace.ID,
Reason: agentsdk.ReinitializeReasonPrebuildClaimed,
OwnerID: workspace.OwnerID,
}
reinitEvents = seeded
} else if job.CompletedAt.Valid && job.Error.Valid {
// Claim build failed permanently. Return 409 so the
// agent treats this as terminal and stops retrying
// (WaitForReinitLoop exits on any 409).
cancelSub()
log.Warn(ctx, "claim build failed",
slog.F("job_id", job.ID),
slog.F("error", job.Error.String))
httpapi.Write(ctx, rw, http.StatusConflict, codersdk.Response{
Message: "Claim build failed permanently.",
Detail: job.Error.String,
})
return
}
// Claim build still in progress — fall through to the
// transmitter. The pubsub subscription (set up above)
// will deliver the event when the build completes
// successfully. Note: FailJob does not publish a claim
// event, so a failed in-progress build will leave the
// agent blocking here until it disconnects and
// reconnects (at which point the durable check above
// handles it).
}
transmitter := agentsdk.NewSSEAgentReinitTransmitter(log, rw, r) transmitter := agentsdk.NewSSEAgentReinitTransmitter(log, rw, r)
+190 -35
View File
@@ -2,6 +2,7 @@ package coderd_test
import ( import (
"context" "context"
"database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@@ -3278,51 +3279,205 @@ func TestAgentConnectionInfo(t *testing.T) {
func TestReinit(t *testing.T) { func TestReinit(t *testing.T) {
t.Parallel() t.Parallel()
db, ps := dbtestutil.NewDB(t) // Helper to create the prebuilds system user's workspace (an
pubsubSpy := pubsubReinitSpy{ // unclaimed prebuild) and return the build result. The first
Pubsub: ps, // build's InitiatorID defaults to PrebuildsSystemUserID via
triedToSubscribe: make(chan string), // dbfake.
setupPrebuildWorkspace := func(t *testing.T, db database.Store, orgID uuid.UUID) dbfake.WorkspaceResponse {
t.Helper()
return dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
OrganizationID: orgID,
OwnerID: database.PrebuildsSystemUserID,
}).WithAgent().Do()
} }
client := coderdtest.New(t, &coderdtest.Options{
Database: db, // Helper to simulate claiming a prebuild: change the workspace
Pubsub: &pubsubSpy, // owner to the real user and create a second (claim) build.
claimPrebuild := func(t *testing.T, db database.Store, sqlDB *sql.DB, ws database.WorkspaceTable, claimerID uuid.UUID, templateVersionID uuid.UUID, complete bool) dbfake.WorkspaceResponse {
t.Helper()
// Change the workspace owner to the claiming user.
_, err := sqlDB.Exec("UPDATE workspaces SET owner_id = $1 WHERE id = $2", claimerID, ws.ID)
require.NoError(t, err)
// Update the in-memory workspace to reflect the new owner
// so that dbfake uses it for the second build.
ws.OwnerID = claimerID
builder := dbfake.WorkspaceBuild(t, db, ws).
Seed(database.WorkspaceBuild{
TemplateVersionID: templateVersionID,
BuildNumber: 2,
InitiatorID: claimerID,
Transition: database.WorkspaceTransitionStart,
}).
WithAgent()
if !complete {
builder = builder.Starting()
}
return builder.Do()
}
t.Run("unclaimed prebuild receives reinit via pubsub", func(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
pubsubSpy := pubsubReinitSpy{
Pubsub: ps,
triedToSubscribe: make(chan string),
}
client := coderdtest.New(t, &coderdtest.Options{
Database: db,
Pubsub: &pubsubSpy,
})
user := coderdtest.CreateFirstUser(t, client)
r := setupPrebuildWorkspace(t, db, user.OrganizationID)
pubsubSpy.Lock()
pubsubSpy.expectedEvent = agentsdk.PrebuildClaimedChannel(r.Workspace.ID)
pubsubSpy.Unlock()
agentCtx := testutil.Context(t, testutil.WaitShort)
agentClient := agentsdk.New(client.URL, agentsdk.WithFixedToken(r.AgentToken))
agentReinitializedCh := make(chan *agentsdk.ReinitializationEvent)
go func() {
reinitEvent, err := agentClient.WaitForReinit(agentCtx)
assert.NoError(t, err)
agentReinitializedCh <- reinitEvent
}()
// We need to subscribe before we publish, lest we miss the
// event.
ctx := testutil.Context(t, testutil.WaitShort)
testutil.TryReceive(ctx, t, pubsubSpy.triedToSubscribe)
// Now that we're subscribed, publish the event.
err := prebuilds.NewPubsubWorkspaceClaimPublisher(ps).PublishWorkspaceClaim(agentsdk.ReinitializationEvent{
WorkspaceID: r.Workspace.ID,
Reason: agentsdk.ReinitializeReasonPrebuildClaimed,
})
require.NoError(t, err)
ctx = testutil.Context(t, testutil.WaitShort)
reinitEvent := testutil.TryReceive(ctx, t, agentReinitializedCh)
require.NotNil(t, reinitEvent)
require.Equal(t, r.Workspace.ID, reinitEvent.WorkspaceID)
}) })
user := coderdtest.CreateFirstUser(t, client)
r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ // Verifies the durable claim check: when an agent reconnects
OrganizationID: user.OrganizationID, // after missing the pubsub event, the handler detects that the
OwnerID: user.UserID, // workspace was originally a prebuild (first build initiated by
}).WithAgent().Do() // PrebuildsSystemUserID), is now claimed (owner changed), and
// the claim build completed, so it sends a one-shot reinit
// event immediately.
t.Run("claimed prebuild receives one-shot reinit on reconnect", func(t *testing.T) {
t.Parallel()
pubsubSpy.Lock() db, ps, sqlDB := dbtestutil.NewDBWithSQLDB(t)
pubsubSpy.expectedEvent = agentsdk.PrebuildClaimedChannel(r.Workspace.ID) client := coderdtest.New(t, &coderdtest.Options{
pubsubSpy.Unlock() Database: db,
Pubsub: ps,
})
user := coderdtest.CreateFirstUser(t, client)
agentCtx := testutil.Context(t, testutil.WaitShort) // Create an unclaimed prebuild (build 1, completed).
agentClient := agentsdk.New(client.URL, agentsdk.WithFixedToken(r.AgentToken)) r := setupPrebuildWorkspace(t, db, user.OrganizationID)
agentReinitializedCh := make(chan *agentsdk.ReinitializationEvent) // Claim it: change owner + create build 2 (completed).
go func() { claimR := claimPrebuild(t, db, sqlDB, r.Workspace, user.UserID, r.TemplateVersion.ID, true)
reinitEvent, err := agentClient.WaitForReinit(agentCtx)
assert.NoError(t, err)
agentReinitializedCh <- reinitEvent
}()
// We need to subscribe before we publish, lest we miss the event agentCtx := testutil.Context(t, testutil.WaitShort)
ctx := testutil.Context(t, testutil.WaitShort) agentClient := agentsdk.New(client.URL, agentsdk.WithFixedToken(claimR.AgentToken))
testutil.TryReceive(ctx, t, pubsubSpy.triedToSubscribe)
// Now that we're subscribed, publish the event agentReinitializedCh := make(chan *agentsdk.ReinitializationEvent)
err := prebuilds.NewPubsubWorkspaceClaimPublisher(ps).PublishWorkspaceClaim(agentsdk.ReinitializationEvent{ go func() {
WorkspaceID: r.Workspace.ID, reinitEvent, err := agentClient.WaitForReinit(agentCtx)
Reason: agentsdk.ReinitializeReasonPrebuildClaimed, assert.NoError(t, err)
agentReinitializedCh <- reinitEvent
}()
// The agent should receive a reinit event immediately from
// the durable claim check — no pubsub publish needed.
ctx := testutil.Context(t, testutil.WaitShort)
reinitEvent := testutil.TryReceive(ctx, t, agentReinitializedCh)
require.NotNil(t, reinitEvent)
require.Equal(t, r.Workspace.ID, reinitEvent.WorkspaceID)
require.Equal(t, agentsdk.ReinitializeReasonPrebuildClaimed, reinitEvent.Reason)
require.Equal(t, user.UserID, reinitEvent.OwnerID)
}) })
require.NoError(t, err)
ctx = testutil.Context(t, testutil.WaitShort) // Verifies that when the claim build completed with an error,
reinitEvent := testutil.TryReceive(ctx, t, agentReinitializedCh) // the handler returns 409 so the agent treats it as terminal
require.NotNil(t, reinitEvent) // and stops retrying (WaitForReinitLoop exits on any 409).
require.Equal(t, r.Workspace.ID, reinitEvent.WorkspaceID) t.Run("failed claim build returns terminal 409", func(t *testing.T) {
t.Parallel()
db, ps, sqlDB := dbtestutil.NewDBWithSQLDB(t)
client := coderdtest.New(t, &coderdtest.Options{
Database: db,
Pubsub: ps,
})
user := coderdtest.CreateFirstUser(t, client)
// Create an unclaimed prebuild (build 1, completed).
r := setupPrebuildWorkspace(t, db, user.OrganizationID)
// Claim it: create build 2 as completed (so agent rows
// exist and the token is valid for auth).
claimR := claimPrebuild(t, db, sqlDB, r.Workspace, user.UserID, r.TemplateVersion.ID, true)
// Simulate a claim build failure: set an error on the
// provisioner job. This models the case where terraform
// apply partially succeeded (creating resources/agents)
// but ultimately errored.
_, err := sqlDB.Exec(
"UPDATE provisioner_jobs SET error = 'simulated claim failure' WHERE id = $1",
claimR.Build.JobID,
)
require.NoError(t, err)
agentCtx := testutil.Context(t, testutil.WaitShort)
agentClient := agentsdk.New(client.URL, agentsdk.WithFixedToken(claimR.AgentToken))
_, err = agentClient.WaitForReinit(agentCtx)
require.Error(t, err)
var sdkErr *codersdk.Error
require.ErrorAs(t, err, &sdkErr)
require.Equal(t, http.StatusConflict, sdkErr.StatusCode())
})
// Verifies that a regular workspace (never a prebuild) gets a
// 409 Conflict response, causing the agent's reinit loop to
// close the channel gracefully.
t.Run("regular workspace gets 409", func(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
client := coderdtest.New(t, &coderdtest.Options{
Database: db,
Pubsub: ps,
})
user := coderdtest.CreateFirstUser(t, client)
// Create a regular workspace (not a prebuild). The first
// build's initiator will be the user, not the prebuilds
// system user.
r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
OrganizationID: user.OrganizationID,
OwnerID: user.UserID,
}).WithAgent().Do()
agentCtx := testutil.Context(t, testutil.WaitShort)
agentClient := agentsdk.New(client.URL, agentsdk.WithFixedToken(r.AgentToken))
// WaitForReinit should return an error wrapping a 409.
_, err := agentClient.WaitForReinit(agentCtx)
require.Error(t, err)
var sdkErr *codersdk.Error
require.ErrorAs(t, err, &sdkErr)
require.Equal(t, http.StatusConflict, sdkErr.StatusCode())
})
} }
type pubsubReinitSpy struct { type pubsubReinitSpy struct {
+19 -2
View File
@@ -3,6 +3,7 @@ package agentsdk
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@@ -706,8 +707,9 @@ const (
) )
type ReinitializationEvent struct { type ReinitializationEvent struct {
WorkspaceID uuid.UUID WorkspaceID uuid.UUID `json:"workspace_id" format:"uuid"`
Reason ReinitializationReason `json:"reason"` Reason ReinitializationReason `json:"reason"`
OwnerID uuid.UUID `json:"owner_id,omitzero" format:"uuid"`
} }
func PrebuildClaimedChannel(id uuid.UUID) string { func PrebuildClaimedChannel(id uuid.UUID) string {
@@ -722,6 +724,9 @@ func (c *Client) WaitForReinit(ctx context.Context) (*ReinitializationEvent, err
if err != nil { if err != nil {
return nil, xerrors.Errorf("parse url: %w", err) return nil, xerrors.Errorf("parse url: %w", err)
} }
q := rpcURL.Query()
q.Set("wait", "true")
rpcURL.RawQuery = q.Encode()
httpClient := &http.Client{ httpClient := &http.Client{
Transport: c.SDK.HTTPClient.Transport, Transport: c.SDK.HTTPClient.Transport,
@@ -750,21 +755,33 @@ func (c *Client) WaitForReinit(ctx context.Context) (*ReinitializationEvent, err
return reinitEvent, nil return reinitEvent, nil
} }
// WaitForReinitLoop polls the /reinit SSE endpoint in a retry loop and
// forwards received reinitialization events to the returned channel. The
// channel is closed when ctx is canceled or the server returns 409
// Conflict (indicating the workspace is not a prebuilt workspace or the
// claim build failed permanently). The caller should select on both the
// channel and ctx.Done().
func WaitForReinitLoop(ctx context.Context, logger slog.Logger, client *Client) <-chan ReinitializationEvent { func WaitForReinitLoop(ctx context.Context, logger slog.Logger, client *Client) <-chan ReinitializationEvent {
reinitEvents := make(chan ReinitializationEvent) reinitEvents := make(chan ReinitializationEvent)
go func() { go func() {
defer close(reinitEvents)
for retrier := retry.New(100*time.Millisecond, 10*time.Second); retrier.Wait(ctx); { for retrier := retry.New(100*time.Millisecond, 10*time.Second); retrier.Wait(ctx); {
logger.Debug(ctx, "waiting for agent reinitialization instructions") logger.Debug(ctx, "waiting for agent reinitialization instructions")
reinitEvent, err := client.WaitForReinit(ctx) reinitEvent, err := client.WaitForReinit(ctx)
if err != nil { if err != nil {
var sdkErr *codersdk.Error
if errors.As(err, &sdkErr) && sdkErr.StatusCode() == http.StatusConflict {
logger.Info(ctx, "received terminal 409, stopping reinit polling",
slog.Error(sdkErr))
return
}
logger.Error(ctx, "failed to wait for agent reinitialization instructions", slog.Error(err)) logger.Error(ctx, "failed to wait for agent reinitialization instructions", slog.Error(err))
continue continue
} }
retrier.Reset() retrier.Reset()
select { select {
case <-ctx.Done(): case <-ctx.Done():
close(reinitEvents)
return return
case reinitEvents <- *reinitEvent: case reinitEvents <- *reinitEvent:
} }
+1
View File
@@ -26,6 +26,7 @@ func TestStreamAgentReinitEvents(t *testing.T) {
eventToSend := agentsdk.ReinitializationEvent{ eventToSend := agentsdk.ReinitializationEvent{
WorkspaceID: uuid.New(), WorkspaceID: uuid.New(),
Reason: agentsdk.ReinitializeReasonPrebuildClaimed, Reason: agentsdk.ReinitializeReasonPrebuildClaimed,
OwnerID: uuid.New(),
} }
events := make(chan agentsdk.ReinitializationEvent, 1) events := make(chan agentsdk.ReinitializationEvent, 1)
+12 -4
View File
@@ -483,22 +483,30 @@ curl -X GET http://coder-server:8080/api/v2/workspaceagents/me/reinit \
`GET /workspaceagents/me/reinit` `GET /workspaceagents/me/reinit`
### Parameters
| Name | In | Type | Required | Description |
|--------|-------|---------|----------|---------------------------------|
| `wait` | query | boolean | false | Opt in to durable reinit checks |
### Example responses ### Example responses
> 200 Response > 200 Response
```json ```json
{ {
"owner_id": "8826ee2e-7933-4665-aef2-2393f84a0d05",
"reason": "prebuild_claimed", "reason": "prebuild_claimed",
"workspaceID": "string" "workspace_id": "0967198e-ec7b-4c6b-b4d3-f71244cadbe9"
} }
``` ```
### Responses ### Responses
| Status | Meaning | Description | Schema | | Status | Meaning | Description | Schema |
|--------|---------------------------------------------------------|-------------|----------------------------------------------------------------------------| |--------|---------------------------------------------------------------|-------------|----------------------------------------------------------------------------|
| 200 | [OK](https://tools.ietf.org/html/rfc7231#section-6.3.1) | OK | [agentsdk.ReinitializationEvent](schemas.md#agentsdkreinitializationevent) | | 200 | [OK](https://tools.ietf.org/html/rfc7231#section-6.3.1) | OK | [agentsdk.ReinitializationEvent](schemas.md#agentsdkreinitializationevent) |
| 409 | [Conflict](https://tools.ietf.org/html/rfc7231#section-6.5.8) | Conflict | [codersdk.Response](schemas.md#codersdkresponse) |
To perform this operation, you must be authenticated. [Learn more](authentication.md). To perform this operation, you must be authenticated. [Learn more](authentication.md).
+7 -5
View File
@@ -186,17 +186,19 @@
```json ```json
{ {
"owner_id": "8826ee2e-7933-4665-aef2-2393f84a0d05",
"reason": "prebuild_claimed", "reason": "prebuild_claimed",
"workspaceID": "string" "workspace_id": "0967198e-ec7b-4c6b-b4d3-f71244cadbe9"
} }
``` ```
### Properties ### Properties
| Name | Type | Required | Restrictions | Description | | Name | Type | Required | Restrictions | Description |
|---------------|--------------------------------------------------------------------|----------|--------------|-------------| |----------------|--------------------------------------------------------------------|----------|--------------|-------------|
| `reason` | [agentsdk.ReinitializationReason](#agentsdkreinitializationreason) | false | | | | `owner_id` | string | false | | |
| `workspaceID` | string | false | | | | `reason` | [agentsdk.ReinitializationReason](#agentsdkreinitializationreason) | false | | |
| `workspace_id` | string | false | | |
## agentsdk.ReinitializationReason ## agentsdk.ReinitializationReason