feat: add a dependency management graph for agents (#20208)

Relates to https://github.com/coder/internal/issues/1093

This is the first of N pull requests to allow coder script ordering.
It introduces what is for now dead code, but paves the way for various
interfaces that allow coder scripts and other processes to depend on one
another via CLI commands and terraform configurations.

The next step is to add reactivity to the graph, such that changes in
the status of one vertex will propagate and allow other vertices to
change their own statuses.

Concurrency and stress testing yield the following:

CPU Profile:
<img width="1512" height="862" alt="Screenshot 2025-10-17 at 10 38 52"
src="https://github.com/user-attachments/assets/f46cf1a2-a0b2-4c02-81a0-069798108ee5"
/>

Mem Profile:
<img width="1512" height="862" alt="Screenshot 2025-10-17 at 10 38 01"
src="https://github.com/user-attachments/assets/45be1235-fff6-45ba-a50d-db9880377bd0"
/>

Predictably, lock contention and memory allocation are the largest
components of this system under stress. Nothing seems untoward.
This commit is contained in:
Sas Swart
2025-10-24 16:18:16 +02:00
committed by GitHub
parent 51d3abb904
commit 6c621364f8
9 changed files with 671 additions and 0 deletions
+3
View File
@@ -12,6 +12,9 @@ node_modules/
vendor/ vendor/
yarn-error.log yarn-error.log
# Test output files
test-output/
# VSCode settings. # VSCode settings.
**/.vscode/* **/.vscode/*
# Allow VSCode recommendations and default settings in project root. # Allow VSCode recommendations and default settings in project root.
+5
View File
@@ -676,6 +676,7 @@ gen/db: $(DB_GEN_FILES)
.PHONY: gen/db .PHONY: gen/db
gen/golden-files: \ gen/golden-files: \
agent/unit/testdata/.gen-golden \
cli/testdata/.gen-golden \ cli/testdata/.gen-golden \
coderd/.gen-golden \ coderd/.gen-golden \
coderd/notifications/.gen-golden \ coderd/notifications/.gen-golden \
@@ -952,6 +953,10 @@ clean/golden-files:
-type f -name '*.golden' -delete -type f -name '*.golden' -delete
.PHONY: clean/golden-files .PHONY: clean/golden-files
agent/unit/testdata/.gen-golden: $(wildcard agent/unit/testdata/*.golden) $(GO_SRC_FILES) $(wildcard agent/unit/*_test.go)
TZ=UTC go test ./agent/unit -run="TestGraph" -update
touch "$@"
cli/testdata/.gen-golden: $(wildcard cli/testdata/*.golden) $(wildcard cli/*.tpl) $(GO_SRC_FILES) $(wildcard cli/*_test.go) cli/testdata/.gen-golden: $(wildcard cli/testdata/*.golden) $(wildcard cli/*.tpl) $(GO_SRC_FILES) $(wildcard cli/*_test.go)
TZ=UTC go test ./cli -run="Test(CommandHelp|ServerYAML|ErrorExamples|.*Golden)" -update TZ=UTC go test ./cli -run="Test(CommandHelp|ServerYAML|ErrorExamples|.*Golden)" -update
touch "$@" touch "$@"
+174
View File
@@ -0,0 +1,174 @@
package unit
import (
"fmt"
"sync"
"golang.org/x/xerrors"
"gonum.org/v1/gonum/graph/encoding/dot"
"gonum.org/v1/gonum/graph/simple"
"gonum.org/v1/gonum/graph/topo"
)
// Graph provides a bidirectional interface over gonum's directed graph implementation.
// While the underlying gonum graph is directed, we overlay bidirectional semantics
// by distinguishing between forward and reverse edges. Wanting and being wanted by
// other units are related but different concepts that have different graph traversal
// implications when Units update their status.
//
// The graph stores edge types to represent different relationships between units,
// allowing for domain-specific semantics beyond simple connectivity.
type Graph[EdgeType, VertexType comparable] struct {
mu sync.RWMutex
// The underlying gonum graph. It stores vertices and edges without knowing about the types of the vertices and edges.
gonumGraph *simple.DirectedGraph
// Maps vertices to their IDs so that a gonum vertex ID can be used to lookup the vertex type.
vertexToID map[VertexType]int64
// Maps vertex IDs to their types so that a vertex type can be used to lookup the gonum vertex ID.
idToVertex map[int64]VertexType
// The next ID to assign to a vertex.
nextID int64
// Store edge types by "fromID->toID" key. This is used to lookup the edge type for a given edge.
edgeTypes map[string]EdgeType
}
// Edge is a convenience type for representing an edge in the graph.
// It encapsulates the from and to vertices and the edge type itself.
type Edge[EdgeType, VertexType comparable] struct {
From VertexType
To VertexType
Edge EdgeType
}
// AddEdge adds an edge to the graph. It initializes the graph and metadata on first use,
// checks for cycles, and adds the edge to the gonum graph.
func (g *Graph[EdgeType, VertexType]) AddEdge(from, to VertexType, edge EdgeType) error {
g.mu.Lock()
defer g.mu.Unlock()
if g.gonumGraph == nil {
g.gonumGraph = simple.NewDirectedGraph()
g.vertexToID = make(map[VertexType]int64)
g.idToVertex = make(map[int64]VertexType)
g.edgeTypes = make(map[string]EdgeType)
g.nextID = 1
}
fromID := g.getOrCreateVertexID(from)
toID := g.getOrCreateVertexID(to)
if g.canReach(to, from) {
return xerrors.Errorf("adding edge (%v -> %v) would create a cycle", from, to)
}
g.gonumGraph.SetEdge(simple.Edge{F: simple.Node(fromID), T: simple.Node(toID)})
edgeKey := fmt.Sprintf("%d->%d", fromID, toID)
g.edgeTypes[edgeKey] = edge
return nil
}
// GetForwardAdjacentVertices returns all the edges that originate from the given vertex.
func (g *Graph[EdgeType, VertexType]) GetForwardAdjacentVertices(from VertexType) []Edge[EdgeType, VertexType] {
g.mu.RLock()
defer g.mu.RUnlock()
fromID, exists := g.vertexToID[from]
if !exists {
return []Edge[EdgeType, VertexType]{}
}
edges := []Edge[EdgeType, VertexType]{}
toNodes := g.gonumGraph.From(fromID)
for toNodes.Next() {
toID := toNodes.Node().ID()
to := g.idToVertex[toID]
// Get the edge type
edgeKey := fmt.Sprintf("%d->%d", fromID, toID)
edgeType := g.edgeTypes[edgeKey]
edges = append(edges, Edge[EdgeType, VertexType]{From: from, To: to, Edge: edgeType})
}
return edges
}
// GetReverseAdjacentVertices returns all the edges that terminate at the given vertex.
func (g *Graph[EdgeType, VertexType]) GetReverseAdjacentVertices(to VertexType) []Edge[EdgeType, VertexType] {
g.mu.RLock()
defer g.mu.RUnlock()
toID, exists := g.vertexToID[to]
if !exists {
return []Edge[EdgeType, VertexType]{}
}
edges := []Edge[EdgeType, VertexType]{}
fromNodes := g.gonumGraph.To(toID)
for fromNodes.Next() {
fromID := fromNodes.Node().ID()
from := g.idToVertex[fromID]
// Get the edge type
edgeKey := fmt.Sprintf("%d->%d", fromID, toID)
edgeType := g.edgeTypes[edgeKey]
edges = append(edges, Edge[EdgeType, VertexType]{From: from, To: to, Edge: edgeType})
}
return edges
}
// getOrCreateVertexID returns the ID for a vertex, creating it if it doesn't exist.
func (g *Graph[EdgeType, VertexType]) getOrCreateVertexID(vertex VertexType) int64 {
if id, exists := g.vertexToID[vertex]; exists {
return id
}
id := g.nextID
g.nextID++
g.vertexToID[vertex] = id
g.idToVertex[id] = vertex
// Add the node to the gonum graph
g.gonumGraph.AddNode(simple.Node(id))
return id
}
// canReach checks if there is a path from the start vertex to the end vertex.
func (g *Graph[EdgeType, VertexType]) canReach(start, end VertexType) bool {
if start == end {
return true
}
startID, startExists := g.vertexToID[start]
endID, endExists := g.vertexToID[end]
if !startExists || !endExists {
return false
}
// Use gonum's built-in path existence check
return topo.PathExistsIn(g.gonumGraph, simple.Node(startID), simple.Node(endID))
}
// ToDOT exports the graph to DOT format for visualization
func (g *Graph[EdgeType, VertexType]) ToDOT(name string) (string, error) {
g.mu.RLock()
defer g.mu.RUnlock()
if g.gonumGraph == nil {
return "", xerrors.New("graph is not initialized")
}
// Marshal the graph to DOT format
dotBytes, err := dot.Marshal(g.gonumGraph, name, "", " ")
if err != nil {
return "", xerrors.Errorf("failed to marshal graph to DOT: %w", err)
}
return string(dotBytes), nil
}
+454
View File
@@ -0,0 +1,454 @@
// Package unit_test provides tests for the unit package.
//
// DOT Graph Testing:
// The graph tests use golden files for DOT representation verification.
// To update the golden files:
// make gen/golden-files
//
// The golden files contain the expected DOT representation and can be easily
// inspected, version controlled, and updated when the graph structure changes.
package unit_test
import (
"bytes"
"flag"
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/coder/coder/v2/agent/unit"
"github.com/coder/coder/v2/cryptorand"
)
type testGraphEdge string
const (
testEdgeStarted testGraphEdge = "started"
testEdgeCompleted testGraphEdge = "completed"
)
type testGraphVertex struct {
Name string
}
type (
testGraph = unit.Graph[testGraphEdge, *testGraphVertex]
testEdge = unit.Edge[testGraphEdge, *testGraphVertex]
)
// randInt generates a random integer in the range [0, limit).
func randInt(limit int) int {
if limit <= 0 {
return 0
}
n, err := cryptorand.Int63n(int64(limit))
if err != nil {
return 0
}
return int(n)
}
// UpdateGoldenFiles indicates golden files should be updated.
// To update the golden files:
// make gen/golden-files
var UpdateGoldenFiles = flag.Bool("update", false, "update .golden files")
// assertDOTGraph requires that the graph's DOT representation matches the golden file
func assertDOTGraph(t *testing.T, graph *testGraph, goldenName string) {
t.Helper()
dot, err := graph.ToDOT(goldenName)
require.NoError(t, err)
goldenFile := filepath.Join("testdata", goldenName+".golden")
if *UpdateGoldenFiles {
t.Logf("update golden file for: %q: %s", goldenName, goldenFile)
err := os.MkdirAll(filepath.Dir(goldenFile), 0o755)
require.NoError(t, err, "want no error creating golden file directory")
err = os.WriteFile(goldenFile, []byte(dot), 0o600)
require.NoError(t, err, "update golden file")
}
expected, err := os.ReadFile(goldenFile)
require.NoError(t, err, "read golden file, run \"make gen/golden-files\" and commit the changes")
// Normalize line endings for cross-platform compatibility
expected = normalizeLineEndings(expected)
normalizedDot := normalizeLineEndings([]byte(dot))
assert.Empty(t, cmp.Diff(string(expected), string(normalizedDot)), "golden file mismatch (-want +got): %s, run \"make gen/golden-files\", verify and commit the changes", goldenFile)
}
// normalizeLineEndings ensures that all line endings are normalized to \n.
// Required for Windows compatibility.
func normalizeLineEndings(content []byte) []byte {
content = bytes.ReplaceAll(content, []byte("\r\n"), []byte("\n"))
content = bytes.ReplaceAll(content, []byte("\r"), []byte("\n"))
return content
}
func TestGraph(t *testing.T) {
t.Parallel()
testFuncs := map[string]func(t *testing.T) *unit.Graph[testGraphEdge, *testGraphVertex]{
"ForwardAndReverseEdges": func(t *testing.T) *unit.Graph[testGraphEdge, *testGraphVertex] {
graph := &unit.Graph[testGraphEdge, *testGraphVertex]{}
unit1 := &testGraphVertex{Name: "unit1"}
unit2 := &testGraphVertex{Name: "unit2"}
unit3 := &testGraphVertex{Name: "unit3"}
err := graph.AddEdge(unit1, unit2, testEdgeCompleted)
require.NoError(t, err)
err = graph.AddEdge(unit1, unit3, testEdgeStarted)
require.NoError(t, err)
// Check for forward edge
vertices := graph.GetForwardAdjacentVertices(unit1)
require.Len(t, vertices, 2)
// Unit 1 depends on the completion of Unit2
require.Contains(t, vertices, testEdge{
From: unit1,
To: unit2,
Edge: testEdgeCompleted,
})
// Unit 1 depends on the start of Unit3
require.Contains(t, vertices, testEdge{
From: unit1,
To: unit3,
Edge: testEdgeStarted,
})
// Check for reverse edges
unit2ReverseEdges := graph.GetReverseAdjacentVertices(unit2)
require.Len(t, unit2ReverseEdges, 1)
// Unit 2 must be completed before Unit 1 can start
require.Contains(t, unit2ReverseEdges, testEdge{
From: unit1,
To: unit2,
Edge: testEdgeCompleted,
})
unit3ReverseEdges := graph.GetReverseAdjacentVertices(unit3)
require.Len(t, unit3ReverseEdges, 1)
// Unit 3 must be started before Unit 1 can complete
require.Contains(t, unit3ReverseEdges, testEdge{
From: unit1,
To: unit3,
Edge: testEdgeStarted,
})
return graph
},
"SelfReference": func(t *testing.T) *testGraph {
graph := &testGraph{}
unit1 := &testGraphVertex{Name: "unit1"}
err := graph.AddEdge(unit1, unit1, testEdgeCompleted)
require.Error(t, err)
require.ErrorContains(t, err, fmt.Sprintf("adding edge (%v -> %v) would create a cycle", unit1, unit1))
return graph
},
"Cycle": func(t *testing.T) *testGraph {
graph := &testGraph{}
unit1 := &testGraphVertex{Name: "unit1"}
unit2 := &testGraphVertex{Name: "unit2"}
err := graph.AddEdge(unit1, unit2, testEdgeCompleted)
require.NoError(t, err)
err = graph.AddEdge(unit2, unit1, testEdgeStarted)
require.Error(t, err)
require.ErrorContains(t, err, fmt.Sprintf("adding edge (%v -> %v) would create a cycle", unit2, unit1))
return graph
},
"MultipleDependenciesSameStatus": func(t *testing.T) *testGraph {
graph := &testGraph{}
unit1 := &testGraphVertex{Name: "unit1"}
unit2 := &testGraphVertex{Name: "unit2"}
unit3 := &testGraphVertex{Name: "unit3"}
unit4 := &testGraphVertex{Name: "unit4"}
// Unit1 depends on completion of both unit2 and unit3 (same status type)
err := graph.AddEdge(unit1, unit2, testEdgeCompleted)
require.NoError(t, err)
err = graph.AddEdge(unit1, unit3, testEdgeCompleted)
require.NoError(t, err)
// Unit1 also depends on starting of unit4 (different status type)
err = graph.AddEdge(unit1, unit4, testEdgeStarted)
require.NoError(t, err)
// Check that unit1 has 3 forward dependencies
forwardEdges := graph.GetForwardAdjacentVertices(unit1)
require.Len(t, forwardEdges, 3)
// Verify all expected dependencies exist
expectedDependencies := []testEdge{
{From: unit1, To: unit2, Edge: testEdgeCompleted},
{From: unit1, To: unit3, Edge: testEdgeCompleted},
{From: unit1, To: unit4, Edge: testEdgeStarted},
}
for _, expected := range expectedDependencies {
require.Contains(t, forwardEdges, expected)
}
// Check reverse dependencies
unit2ReverseEdges := graph.GetReverseAdjacentVertices(unit2)
require.Len(t, unit2ReverseEdges, 1)
require.Contains(t, unit2ReverseEdges, testEdge{
From: unit1, To: unit2, Edge: testEdgeCompleted,
})
unit3ReverseEdges := graph.GetReverseAdjacentVertices(unit3)
require.Len(t, unit3ReverseEdges, 1)
require.Contains(t, unit3ReverseEdges, testEdge{
From: unit1, To: unit3, Edge: testEdgeCompleted,
})
unit4ReverseEdges := graph.GetReverseAdjacentVertices(unit4)
require.Len(t, unit4ReverseEdges, 1)
require.Contains(t, unit4ReverseEdges, testEdge{
From: unit1, To: unit4, Edge: testEdgeStarted,
})
return graph
},
}
for testName, testFunc := range testFuncs {
var graph *testGraph
t.Run(testName, func(t *testing.T) {
t.Parallel()
graph = testFunc(t)
assertDOTGraph(t, graph, testName)
})
}
}
func TestGraphThreadSafety(t *testing.T) {
t.Parallel()
t.Run("ConcurrentReadWrite", func(t *testing.T) {
t.Parallel()
graph := &testGraph{}
var wg sync.WaitGroup
const numWriters = 50
const numReaders = 100
const operationsPerWriter = 1000
const operationsPerReader = 2000
barrier := make(chan struct{})
// Launch writers
for i := 0; i < numWriters; i++ {
wg.Add(1)
go func(writerID int) {
defer wg.Done()
<-barrier
for j := 0; j < operationsPerWriter; j++ {
from := &testGraphVertex{Name: fmt.Sprintf("writer-%d-%d", writerID, j)}
to := &testGraphVertex{Name: fmt.Sprintf("writer-%d-%d", writerID, j+1)}
graph.AddEdge(from, to, testEdgeCompleted)
}
}(i)
}
// Launch readers
readerResults := make([]struct {
panicked bool
readCount int
}, numReaders)
for i := 0; i < numReaders; i++ {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
<-barrier
defer func() {
if r := recover(); r != nil {
readerResults[readerID].panicked = true
}
}()
readCount := 0
for j := 0; j < operationsPerReader; j++ {
// Create a test vertex and read
testUnit := &testGraphVertex{Name: fmt.Sprintf("test-reader-%d-%d", readerID, j)}
forwardEdges := graph.GetForwardAdjacentVertices(testUnit)
reverseEdges := graph.GetReverseAdjacentVertices(testUnit)
// Just verify no panics (results may be nil for non-existent vertices)
_ = forwardEdges
_ = reverseEdges
readCount++
}
readerResults[readerID].readCount = readCount
}(i)
}
close(barrier)
wg.Wait()
// Verify no panics occurred in readers
for i, result := range readerResults {
require.False(t, result.panicked, "reader %d panicked", i)
require.Equal(t, operationsPerReader, result.readCount, "reader %d should have performed expected reads", i)
}
})
t.Run("ConcurrentCycleDetection", func(t *testing.T) {
t.Parallel()
graph := &testGraph{}
// Pre-create chain: A→B→C→D
unitA := &testGraphVertex{Name: "A"}
unitB := &testGraphVertex{Name: "B"}
unitC := &testGraphVertex{Name: "C"}
unitD := &testGraphVertex{Name: "D"}
err := graph.AddEdge(unitA, unitB, testEdgeCompleted)
require.NoError(t, err)
err = graph.AddEdge(unitB, unitC, testEdgeCompleted)
require.NoError(t, err)
err = graph.AddEdge(unitC, unitD, testEdgeCompleted)
require.NoError(t, err)
barrier := make(chan struct{})
var wg sync.WaitGroup
const numGoroutines = 50
cycleErrors := make([]error, numGoroutines)
// Launch goroutines trying to add D→A (creates cycle)
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
<-barrier
err := graph.AddEdge(unitD, unitA, testEdgeCompleted)
cycleErrors[goroutineID] = err
}(i)
}
close(barrier)
wg.Wait()
// Verify all attempts correctly returned cycle error
for i, err := range cycleErrors {
require.Error(t, err, "goroutine %d should have detected cycle", i)
require.Contains(t, err.Error(), "would create a cycle")
}
// Verify graph remains valid (original chain intact)
dot, err := graph.ToDOT("test")
require.NoError(t, err)
require.NotEmpty(t, dot)
})
t.Run("ConcurrentToDOT", func(t *testing.T) {
t.Parallel()
graph := &testGraph{}
// Pre-populate graph
for i := 0; i < 20; i++ {
from := &testGraphVertex{Name: fmt.Sprintf("dot-unit-%d", i)}
to := &testGraphVertex{Name: fmt.Sprintf("dot-unit-%d", i+1)}
err := graph.AddEdge(from, to, testEdgeCompleted)
require.NoError(t, err)
}
barrier := make(chan struct{})
var wg sync.WaitGroup
const numReaders = 100
const numWriters = 20
dotResults := make([]string, numReaders)
// Launch readers calling ToDOT
dotErrors := make([]error, numReaders)
for i := 0; i < numReaders; i++ {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
<-barrier
dot, err := graph.ToDOT(fmt.Sprintf("test-%d", readerID))
dotErrors[readerID] = err
if err == nil {
dotResults[readerID] = dot
}
}(i)
}
// Launch writers adding edges
for i := 0; i < numWriters; i++ {
wg.Add(1)
go func(writerID int) {
defer wg.Done()
<-barrier
from := &testGraphVertex{Name: fmt.Sprintf("writer-dot-%d", writerID)}
to := &testGraphVertex{Name: fmt.Sprintf("writer-dot-target-%d", writerID)}
graph.AddEdge(from, to, testEdgeCompleted)
}(i)
}
close(barrier)
wg.Wait()
// Verify no errors occurred during DOT generation
for i, err := range dotErrors {
require.NoError(t, err, "DOT generation error at index %d", i)
}
// Verify all DOT results are valid
for i, dot := range dotResults {
require.NotEmpty(t, dot, "DOT result %d should not be empty", i)
}
})
}
func BenchmarkGraph_ConcurrentMixedOperations(b *testing.B) {
graph := &testGraph{}
var wg sync.WaitGroup
const numGoroutines = 200
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Launch goroutines performing random operations
for j := 0; j < numGoroutines; j++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
operationCount := 0
for operationCount < 50 {
operation := float32(randInt(100)) / 100.0
if operation < 0.6 { // 60% reads
// Read operation
testUnit := &testGraphVertex{Name: fmt.Sprintf("bench-read-%d-%d", goroutineID, operationCount)}
forwardEdges := graph.GetForwardAdjacentVertices(testUnit)
reverseEdges := graph.GetReverseAdjacentVertices(testUnit)
// Just verify no panics (results may be nil for non-existent vertices)
_ = forwardEdges
_ = reverseEdges
} else { // 40% writes
// Write operation
from := &testGraphVertex{Name: fmt.Sprintf("bench-write-%d-%d", goroutineID, operationCount)}
to := &testGraphVertex{Name: fmt.Sprintf("bench-write-target-%d-%d", goroutineID, operationCount)}
graph.AddEdge(from, to, testEdgeCompleted)
}
operationCount++
}
}(j)
}
wg.Wait()
}
}
+8
View File
@@ -0,0 +1,8 @@
strict digraph Cycle {
// Node definitions.
1;
2;
// Edge definitions.
1 -> 2;
}
+10
View File
@@ -0,0 +1,10 @@
strict digraph ForwardAndReverseEdges {
// Node definitions.
1;
2;
3;
// Edge definitions.
1 -> 2;
1 -> 3;
}
@@ -0,0 +1,12 @@
strict digraph MultipleDependenciesSameStatus {
// Node definitions.
1;
2;
3;
4;
// Edge definitions.
1 -> 2;
1 -> 3;
1 -> 4;
}
+4
View File
@@ -0,0 +1,4 @@
strict digraph SelfReference {
// Node definitions.
1;
}
+1
View File
@@ -486,6 +486,7 @@ require (
github.com/go-git/go-git/v5 v5.16.2 github.com/go-git/go-git/v5 v5.16.2
github.com/icholy/replace v0.6.0 github.com/icholy/replace v0.6.0
github.com/mark3labs/mcp-go v0.38.0 github.com/mark3labs/mcp-go v0.38.0
gonum.org/v1/gonum v0.16.0
) )
require ( require (