Files
coder/coderd/workspaceupdates.go
Spike Curtis bddb808b25 chore: arrange imports in a standard way (#21452)
Fixes all our Go file imports to match the preferred spec that we've _mostly_ been using. For example:

```
import (
	"context"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"golang.org/x/xerrors"
	"gopkg.in/natefinch/lumberjack.v2"

	"cdr.dev/slog/v3"
	"github.com/coder/coder/v2/codersdk/agentsdk"
	"github.com/coder/serpent"
)
```

3 groups: standard library, 3rd partly libs, Coder libs.

This PR makes the change across the codebase. The PR in the stack above modifies our formatting to maintain this state of affairs, and is a separate PR so it's possible to review that one in detail.
2026-01-08 15:24:11 +04:00

312 lines
8.2 KiB
Go

package coderd
import (
"context"
"fmt"
"sync"
"github.com/google/uuid"
"golang.org/x/xerrors"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/util/slice"
"github.com/coder/coder/v2/coderd/wspubsub"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
)
type UpdatesQuerier interface {
// GetAuthorizedWorkspacesAndAgentsByOwnerID requires a context with an actor set
GetWorkspacesAndAgentsByOwnerID(ctx context.Context, ownerID uuid.UUID) ([]database.GetWorkspacesAndAgentsByOwnerIDRow, error)
GetWorkspaceByAgentID(ctx context.Context, agentID uuid.UUID) (database.Workspace, error)
}
type workspacesByID = map[uuid.UUID]ownedWorkspace
type ownedWorkspace struct {
WorkspaceName string
Status proto.Workspace_Status
Agents []database.AgentIDNamePair
}
// Equal does not compare agents
func (w ownedWorkspace) Equal(other ownedWorkspace) bool {
return w.WorkspaceName == other.WorkspaceName &&
w.Status == other.Status
}
type sub struct {
// ALways contains an actor
ctx context.Context
cancelFn context.CancelFunc
mu sync.RWMutex
userID uuid.UUID
ch chan *proto.WorkspaceUpdate
prev workspacesByID
db UpdatesQuerier
ps pubsub.Pubsub
logger slog.Logger
psCancelFn func()
}
func (s *sub) handleEvent(ctx context.Context, event wspubsub.WorkspaceEvent, err error) {
s.mu.Lock()
defer s.mu.Unlock()
switch event.Kind {
case wspubsub.WorkspaceEventKindStateChange:
case wspubsub.WorkspaceEventKindAgentConnectionUpdate:
case wspubsub.WorkspaceEventKindAgentTimeout:
case wspubsub.WorkspaceEventKindAgentLifecycleUpdate:
default:
if err == nil {
return
}
// Always attempt an update if the pubsub lost connection
s.logger.Warn(ctx, "failed to handle workspace event", slog.Error(err))
}
// Use context containing actor
rows, err := s.db.GetWorkspacesAndAgentsByOwnerID(s.ctx, s.userID)
if err != nil {
s.logger.Warn(ctx, "failed to get workspaces and agents by owner ID", slog.Error(err))
return
}
latest := convertRows(rows)
out, updated := produceUpdate(s.prev, latest)
if !updated {
return
}
s.prev = latest
select {
case <-s.ctx.Done():
return
case s.ch <- out:
}
}
func (s *sub) start(ctx context.Context) (err error) {
rows, err := s.db.GetWorkspacesAndAgentsByOwnerID(ctx, s.userID)
if err != nil {
return xerrors.Errorf("get workspaces and agents by owner ID: %w", err)
}
latest := convertRows(rows)
initUpdate, _ := produceUpdate(workspacesByID{}, latest)
s.ch <- initUpdate
s.prev = latest
cancel, err := s.ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(s.userID), wspubsub.HandleWorkspaceEvent(s.handleEvent))
if err != nil {
return xerrors.Errorf("subscribe to workspace event channel: %w", err)
}
s.psCancelFn = cancel
return nil
}
func (s *sub) Close() error {
s.cancelFn()
if s.psCancelFn != nil {
s.psCancelFn()
}
close(s.ch)
return nil
}
func (s *sub) Updates() <-chan *proto.WorkspaceUpdate {
return s.ch
}
var _ tailnet.Subscription = (*sub)(nil)
type updatesProvider struct {
ps pubsub.Pubsub
logger slog.Logger
db UpdatesQuerier
auth rbac.Authorizer
ctx context.Context
cancelFn func()
}
var _ tailnet.WorkspaceUpdatesProvider = (*updatesProvider)(nil)
func NewUpdatesProvider(
logger slog.Logger,
ps pubsub.Pubsub,
db UpdatesQuerier,
auth rbac.Authorizer,
) tailnet.WorkspaceUpdatesProvider {
ctx, cancel := context.WithCancel(context.Background())
out := &updatesProvider{
auth: auth,
db: db,
ps: ps,
logger: logger,
ctx: ctx,
cancelFn: cancel,
}
return out
}
func (u *updatesProvider) Close() error {
u.cancelFn()
return nil
}
// Subscribe subscribes to workspace updates for a user, for the workspaces
// that user is authorized to `ActionRead` on. The provided context must have
// a dbauthz actor set.
func (u *updatesProvider) Subscribe(ctx context.Context, userID uuid.UUID) (tailnet.Subscription, error) {
actor, ok := dbauthz.ActorFromContext(ctx)
if !ok {
return nil, xerrors.Errorf("actor not found in context")
}
ctx, cancel := context.WithCancel(u.ctx)
ctx = dbauthz.As(ctx, actor)
ch := make(chan *proto.WorkspaceUpdate, 1)
sub := &sub{
ctx: ctx,
cancelFn: cancel,
userID: userID,
ch: ch,
db: u.db,
ps: u.ps,
logger: u.logger.Named(fmt.Sprintf("workspace_updates_subscriber_%s", userID)),
prev: workspacesByID{},
}
err := sub.start(ctx)
if err != nil {
_ = sub.Close()
return nil, err
}
return sub, nil
}
func produceUpdate(oldWS, newWS workspacesByID) (out *proto.WorkspaceUpdate, updated bool) {
out = &proto.WorkspaceUpdate{
UpsertedWorkspaces: []*proto.Workspace{},
UpsertedAgents: []*proto.Agent{},
DeletedWorkspaces: []*proto.Workspace{},
DeletedAgents: []*proto.Agent{},
}
for wsID, newWorkspace := range newWS {
oldWorkspace, exists := oldWS[wsID]
// Upsert both workspace and agents if the workspace is new
if !exists {
out.UpsertedWorkspaces = append(out.UpsertedWorkspaces, &proto.Workspace{
Id: tailnet.UUIDToByteSlice(wsID),
Name: newWorkspace.WorkspaceName,
Status: newWorkspace.Status,
})
for _, agent := range newWorkspace.Agents {
out.UpsertedAgents = append(out.UpsertedAgents, &proto.Agent{
Id: tailnet.UUIDToByteSlice(agent.ID),
Name: agent.Name,
WorkspaceId: tailnet.UUIDToByteSlice(wsID),
})
}
updated = true
continue
}
// Upsert workspace if the workspace is updated
if !newWorkspace.Equal(oldWorkspace) {
out.UpsertedWorkspaces = append(out.UpsertedWorkspaces, &proto.Workspace{
Id: tailnet.UUIDToByteSlice(wsID),
Name: newWorkspace.WorkspaceName,
Status: newWorkspace.Status,
})
updated = true
}
add, remove := slice.SymmetricDifference(oldWorkspace.Agents, newWorkspace.Agents)
for _, agent := range add {
out.UpsertedAgents = append(out.UpsertedAgents, &proto.Agent{
Id: tailnet.UUIDToByteSlice(agent.ID),
Name: agent.Name,
WorkspaceId: tailnet.UUIDToByteSlice(wsID),
})
updated = true
}
for _, agent := range remove {
out.DeletedAgents = append(out.DeletedAgents, &proto.Agent{
Id: tailnet.UUIDToByteSlice(agent.ID),
Name: agent.Name,
WorkspaceId: tailnet.UUIDToByteSlice(wsID),
})
updated = true
}
}
// Delete workspace and agents if the workspace is deleted
for wsID, oldWorkspace := range oldWS {
if _, exists := newWS[wsID]; !exists {
out.DeletedWorkspaces = append(out.DeletedWorkspaces, &proto.Workspace{
Id: tailnet.UUIDToByteSlice(wsID),
Name: oldWorkspace.WorkspaceName,
Status: oldWorkspace.Status,
})
for _, agent := range oldWorkspace.Agents {
out.DeletedAgents = append(out.DeletedAgents, &proto.Agent{
Id: tailnet.UUIDToByteSlice(agent.ID),
Name: agent.Name,
WorkspaceId: tailnet.UUIDToByteSlice(wsID),
})
}
updated = true
}
}
return out, updated
}
func convertRows(rows []database.GetWorkspacesAndAgentsByOwnerIDRow) workspacesByID {
out := workspacesByID{}
for _, row := range rows {
agents := []database.AgentIDNamePair{}
for _, agent := range row.Agents {
agents = append(agents, database.AgentIDNamePair{
ID: agent.ID,
Name: agent.Name,
})
}
out[row.ID] = ownedWorkspace{
WorkspaceName: row.Name,
Status: tailnet.WorkspaceStatusToProto(codersdk.ConvertWorkspaceStatus(codersdk.ProvisionerJobStatus(row.JobStatus), codersdk.WorkspaceTransition(row.Transition))),
Agents: agents,
}
}
return out
}
type rbacAuthorizer struct {
sshPrep rbac.PreparedAuthorized
db UpdatesQuerier
}
func (r *rbacAuthorizer) AuthorizeTunnel(ctx context.Context, agentID uuid.UUID) error {
ws, err := r.db.GetWorkspaceByAgentID(ctx, agentID)
if err != nil {
return xerrors.Errorf("get workspace by agent ID: %w", err)
}
// Authorizes against `ActionSSH`
return r.sshPrep.Authorize(ctx, ws.RBACObject())
}
var _ tailnet.TunnelAuthorizer = (*rbacAuthorizer)(nil)