Files
Sas Swart ce627bf23f feat: implement agent socket api, client and cli (#20758)
closes: https://github.com/coder/coder/issues/10352
closes: https://github.com/coder/internal/issues/1094
closes: https://github.com/coder/internal/issues/1095

In this pull request, we enable a new set of experimental cli commands
grouped under `coder exp sync`.
These commands allow any process acting within a coder workspace to
inform the coder agent of its requirements and execution progress. The
coder agent will then relay this information to other processes that
have subscribed.

These commands are:
```
# Check if this feature is enabled in your environment 
coder exp sync ping

# express that your unit depends on another
coder exp sync want <unit> <dependency_unit> 

# express that your unit intends to start a portion of the script that requires 
# other units to have completed first. This command blocks until all dependencies have been met
coder exp sync start <unit> 

# express that your unit has completes its work, allowing dependent units to begin their execution
coder exp sync complete <unit>
```

Example:

In order to automatically run claude code in a new workspace, it must
first have a git repository cloned. The scripts responsible for cloning
the repository and for running claude code would coordinate in the
following way:

```bash
# Script A: Claude code

# Inform the agent that the claude script wants the git script.
# That is, the git script must have completed before the claude script can begin its execution
coder exp sync want claude git

# Inform the agent that we would now like to begin execution of claude.
# This command will block until the git script (and any other defined dependencies)
# have completed
coder exp sync start claude

# Now we run claude code and any other commands we need
claude ...

# Once our script has completed, we inform the agent, so that any scripts that depend on this one
# may begin their execution

coder exp sync complete claude
```

```bash
# Script B: Git

# Because the git script does not have any dependencies, we can simply inform the agent that we 
# intend to start
coder exp sync start git

git clone ssh://git@github.com/coder/coder

# Once the repository have been cloned, we inform the agent that this script is complete, so that
# scripts that depend on it may begin their execution.
coder exp sync complete git
```

Notes:
* Unit names (ie. `claude` and `git`) given as input to the sync
commands are arbitrary strings. You do not have to conform to specific
identifiers. We recommend naming your scripts descriptively, but
succinctly.
* Scripts unit names should be well documented. Other scripts will need
to know the names you've chosen in order to depend on yours. Therefore,
you

---------

Co-authored-by: Mathias Fredriksson <mafredri@gmail.com>
2025-11-28 08:33:50 +02:00

291 lines
8.1 KiB
Go

package unit
import (
"errors"
"fmt"
"sync"
"golang.org/x/xerrors"
"github.com/coder/coder/v2/coderd/util/slice"
)
var (
ErrUnitIDRequired = xerrors.New("unit name is required")
ErrUnitNotFound = xerrors.New("unit not found")
ErrUnitAlreadyRegistered = xerrors.New("unit already registered")
ErrCannotUpdateOtherUnit = xerrors.New("cannot update other unit's status")
ErrDependenciesNotSatisfied = xerrors.New("unit dependencies not satisfied")
ErrSameStatusAlreadySet = xerrors.New("same status already set")
ErrCycleDetected = xerrors.New("cycle detected")
ErrFailedToAddDependency = xerrors.New("failed to add dependency")
)
// Status represents the status of a unit.
type Status string
var _ fmt.Stringer = Status("")
func (s Status) String() string {
if s == StatusNotRegistered {
return "not registered"
}
return string(s)
}
// Status constants for dependency tracking.
const (
StatusNotRegistered Status = ""
StatusPending Status = "pending"
StatusStarted Status = "started"
StatusComplete Status = "completed"
)
// ID provides a type narrowed representation of the unique identifier of a unit.
type ID string
// Unit represents a point-in-time snapshot of a vertex in the dependency graph.
// Units may depend on other units, or be depended on by other units. The unit struct
// is not aware of updates made to the dependency graph after it is initialized and should
// not be cached.
type Unit struct {
id ID
status Status
// ready is true if all dependencies are satisfied.
// It does not have an accessor method on Unit, because a unit cannot know whether it is ready.
// Only the Manager can calculate whether a unit is ready based on knowledge of the dependency graph.
// To discourage use of an outdated readiness value, only the Manager should set and return this field.
ready bool
}
func (u Unit) ID() ID {
return u.id
}
func (u Unit) Status() Status {
return u.status
}
// Dependency represents a dependency relationship between units.
type Dependency struct {
Unit ID
DependsOn ID
RequiredStatus Status
CurrentStatus Status
IsSatisfied bool
}
// Manager provides reactive dependency tracking over a Graph.
// It manages Unit registration, dependency relationships, and status updates
// with automatic recalculation of readiness when dependencies are satisfied.
type Manager struct {
mu sync.RWMutex
// The underlying graph that stores dependency relationships
graph *Graph[Status, ID]
// Store vertex instances for each unit to ensure consistent references
units map[ID]Unit
}
// NewManager creates a new Manager instance.
func NewManager() *Manager {
return &Manager{
graph: &Graph[Status, ID]{},
units: make(map[ID]Unit),
}
}
// Register adds a unit to the manager if it is not already registered.
// If a Unit is already registered (per the ID field), it is not updated.
func (m *Manager) Register(id ID) error {
m.mu.Lock()
defer m.mu.Unlock()
if id == "" {
return xerrors.Errorf("registering unit %q: %w", id, ErrUnitIDRequired)
}
if m.registered(id) {
return xerrors.Errorf("registering unit %q: %w", id, ErrUnitAlreadyRegistered)
}
m.units[id] = Unit{
id: id,
status: StatusPending,
ready: true,
}
return nil
}
// registered checks if a unit is registered in the manager.
func (m *Manager) registered(id ID) bool {
return m.units[id].status != StatusNotRegistered
}
// Unit fetches a unit from the manager. If the unit does not exist,
// it returns the Unit zero-value as a placeholder unit, because
// units may depend on other units that have not yet been created.
func (m *Manager) Unit(id ID) (Unit, error) {
if id == "" {
return Unit{}, xerrors.Errorf("unit ID cannot be empty: %w", ErrUnitIDRequired)
}
m.mu.RLock()
defer m.mu.RUnlock()
return m.units[id], nil
}
func (m *Manager) IsReady(id ID) (bool, error) {
if id == "" {
return false, xerrors.Errorf("unit ID cannot be empty: %w", ErrUnitIDRequired)
}
m.mu.RLock()
defer m.mu.RUnlock()
if !m.registered(id) {
return true, nil
}
return m.units[id].ready, nil
}
// AddDependency adds a dependency relationship between units.
// The unit depends on the dependsOn unit reaching the requiredStatus.
func (m *Manager) AddDependency(unit ID, dependsOn ID, requiredStatus Status) error {
m.mu.Lock()
defer m.mu.Unlock()
switch {
case unit == "":
return xerrors.Errorf("dependent name cannot be empty: %w", ErrUnitIDRequired)
case dependsOn == "":
return xerrors.Errorf("dependency name cannot be empty: %w", ErrUnitIDRequired)
case !m.registered(unit):
return xerrors.Errorf("dependent unit %q must be registered first: %w", unit, ErrUnitNotFound)
}
// Add the dependency edge to the graph
// The edge goes from unit to dependsOn, representing the dependency
err := m.graph.AddEdge(unit, dependsOn, requiredStatus)
if err != nil {
return xerrors.Errorf("adding edge for unit %q: %w", unit, errors.Join(ErrFailedToAddDependency, err))
}
// Recalculate readiness for the unit since it now has a new dependency
m.recalculateReadinessUnsafe(unit)
return nil
}
// UpdateStatus updates a unit's status and recalculates readiness for affected dependents.
func (m *Manager) UpdateStatus(unit ID, newStatus Status) error {
m.mu.Lock()
defer m.mu.Unlock()
switch {
case unit == "":
return xerrors.Errorf("updating status for unit %q: %w", unit, ErrUnitIDRequired)
case !m.registered(unit):
return xerrors.Errorf("unit %q must be registered first: %w", unit, ErrUnitNotFound)
}
u := m.units[unit]
if u.status == newStatus {
return xerrors.Errorf("checking status for unit %q: %w", unit, ErrSameStatusAlreadySet)
}
u.status = newStatus
m.units[unit] = u
// Get all units that depend on this one (reverse adjacent vertices)
dependents := m.graph.GetReverseAdjacentVertices(unit)
// Recalculate readiness for all dependents
for _, dependent := range dependents {
m.recalculateReadinessUnsafe(dependent.From)
}
return nil
}
// recalculateReadinessUnsafe recalculates the readiness state for a unit.
// This method assumes the caller holds the write lock.
func (m *Manager) recalculateReadinessUnsafe(unit ID) {
u := m.units[unit]
dependencies := m.graph.GetForwardAdjacentVertices(unit)
allSatisfied := true
for _, dependency := range dependencies {
requiredStatus := dependency.Edge
dependsOnUnit := m.units[dependency.To]
if dependsOnUnit.status != requiredStatus {
allSatisfied = false
break
}
}
u.ready = allSatisfied
m.units[unit] = u
}
// GetGraph returns the underlying graph for visualization and debugging.
// This should be used carefully as it exposes the internal graph structure.
func (m *Manager) GetGraph() *Graph[Status, ID] {
return m.graph
}
// GetAllDependencies returns all dependencies for a unit, both satisfied and unsatisfied.
func (m *Manager) GetAllDependencies(unit ID) ([]Dependency, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if unit == "" {
return nil, xerrors.Errorf("unit ID cannot be empty: %w", ErrUnitIDRequired)
}
if !m.registered(unit) {
return nil, xerrors.Errorf("checking registration for unit %q: %w", unit, ErrUnitNotFound)
}
dependencies := m.graph.GetForwardAdjacentVertices(unit)
var allDependencies []Dependency
for _, dependency := range dependencies {
dependsOnUnit := m.units[dependency.To]
requiredStatus := dependency.Edge
allDependencies = append(allDependencies, Dependency{
Unit: unit,
DependsOn: dependency.To,
RequiredStatus: requiredStatus,
CurrentStatus: dependsOnUnit.status,
IsSatisfied: dependsOnUnit.status == requiredStatus,
})
}
return allDependencies, nil
}
// GetUnmetDependencies returns a list of unsatisfied dependencies for a unit.
func (m *Manager) GetUnmetDependencies(unit ID) ([]Dependency, error) {
allDependencies, err := m.GetAllDependencies(unit)
if err != nil {
return nil, err
}
var unmetDependencies []Dependency = slice.Filter(allDependencies, func(dependency Dependency) bool {
return !dependency.IsSatisfied
})
return unmetDependencies, nil
}
// ExportDOT exports the dependency graph to DOT format for visualization.
func (m *Manager) ExportDOT(name string) (string, error) {
return m.graph.ToDOT(name)
}