mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
ce627bf23f
closes: https://github.com/coder/coder/issues/10352 closes: https://github.com/coder/internal/issues/1094 closes: https://github.com/coder/internal/issues/1095 In this pull request, we enable a new set of experimental cli commands grouped under `coder exp sync`. These commands allow any process acting within a coder workspace to inform the coder agent of its requirements and execution progress. The coder agent will then relay this information to other processes that have subscribed. These commands are: ``` # Check if this feature is enabled in your environment coder exp sync ping # express that your unit depends on another coder exp sync want <unit> <dependency_unit> # express that your unit intends to start a portion of the script that requires # other units to have completed first. This command blocks until all dependencies have been met coder exp sync start <unit> # express that your unit has completes its work, allowing dependent units to begin their execution coder exp sync complete <unit> ``` Example: In order to automatically run claude code in a new workspace, it must first have a git repository cloned. The scripts responsible for cloning the repository and for running claude code would coordinate in the following way: ```bash # Script A: Claude code # Inform the agent that the claude script wants the git script. # That is, the git script must have completed before the claude script can begin its execution coder exp sync want claude git # Inform the agent that we would now like to begin execution of claude. # This command will block until the git script (and any other defined dependencies) # have completed coder exp sync start claude # Now we run claude code and any other commands we need claude ... # Once our script has completed, we inform the agent, so that any scripts that depend on this one # may begin their execution coder exp sync complete claude ``` ```bash # Script B: Git # Because the git script does not have any dependencies, we can simply inform the agent that we # intend to start coder exp sync start git git clone ssh://git@github.com/coder/coder # Once the repository have been cloned, we inform the agent that this script is complete, so that # scripts that depend on it may begin their execution. coder exp sync complete git ``` Notes: * Unit names (ie. `claude` and `git`) given as input to the sync commands are arbitrary strings. You do not have to conform to specific identifiers. We recommend naming your scripts descriptively, but succinctly. * Scripts unit names should be well documented. Other scripts will need to know the names you've chosen in order to depend on yours. Therefore, you --------- Co-authored-by: Mathias Fredriksson <mafredri@gmail.com>
390 lines
12 KiB
Go
390 lines
12 KiB
Go
package agentsocket_test
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"cdr.dev/slog"
|
|
"github.com/coder/coder/v2/agent/agentsocket"
|
|
"github.com/coder/coder/v2/agent/unit"
|
|
"github.com/coder/coder/v2/testutil"
|
|
)
|
|
|
|
// tempDirUnixSocket returns a temporary directory that can safely hold unix
|
|
// sockets (probably).
|
|
//
|
|
// During tests on darwin we hit the max path length limit for unix sockets
|
|
// pretty easily in the default location, so this function uses /tmp instead to
|
|
// get shorter paths. To keep paths short, we use a hash of the test name
|
|
// instead of the full test name.
|
|
func tempDirUnixSocket(t *testing.T) string {
|
|
t.Helper()
|
|
if runtime.GOOS == "darwin" {
|
|
// Use a short hash of the test name to keep the path under 104 chars
|
|
hash := sha256.Sum256([]byte(t.Name()))
|
|
hashStr := hex.EncodeToString(hash[:])[:8] // Use first 8 chars of hash
|
|
dir, err := os.MkdirTemp("/tmp", fmt.Sprintf("c-%s-", hashStr))
|
|
require.NoError(t, err, "create temp dir for unix socket test")
|
|
t.Cleanup(func() {
|
|
err := os.RemoveAll(dir)
|
|
assert.NoError(t, err, "remove temp dir", dir)
|
|
})
|
|
return dir
|
|
}
|
|
return t.TempDir()
|
|
}
|
|
|
|
// newSocketClient creates a DRPC client connected to the Unix socket at the given path.
|
|
func newSocketClient(ctx context.Context, t *testing.T, socketPath string) *agentsocket.Client {
|
|
t.Helper()
|
|
|
|
client, err := agentsocket.NewClient(ctx, agentsocket.WithPath(socketPath))
|
|
t.Cleanup(func() {
|
|
_ = client.Close()
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
return client
|
|
}
|
|
|
|
func TestDRPCAgentSocketService(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
if runtime.GOOS == "windows" {
|
|
t.Skip("agentsocket is not supported on Windows")
|
|
}
|
|
|
|
t.Run("Ping", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := filepath.Join(tempDirUnixSocket(t), "test.sock")
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
err = client.Ping(ctx)
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("SyncStart", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("NewUnit", func(t *testing.T) {
|
|
t.Parallel()
|
|
socketPath := filepath.Join(tempDirUnixSocket(t), "test.sock")
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err := client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
})
|
|
|
|
t.Run("UnitAlreadyStarted", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := filepath.Join(tempDirUnixSocket(t), "test.sock")
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// First Start
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
status, err := client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
|
|
// Second Start
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.ErrorContains(t, err, unit.ErrSameStatusAlreadySet.Error())
|
|
|
|
status, err = client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
})
|
|
|
|
t.Run("UnitAlreadyCompleted", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := filepath.Join(tempDirUnixSocket(t), "test.sock")
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// First start
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err := client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
|
|
// Complete the unit
|
|
err = client.SyncComplete(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err = client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusComplete, status.Status)
|
|
|
|
// Second start
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err = client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
})
|
|
|
|
t.Run("UnitNotReady", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := filepath.Join(tempDirUnixSocket(t), "test.sock")
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
err = client.SyncWant(ctx, "test-unit", "dependency-unit")
|
|
require.NoError(t, err)
|
|
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.ErrorContains(t, err, "unit not ready")
|
|
|
|
status, err := client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusPending, status.Status)
|
|
require.False(t, status.IsReady)
|
|
})
|
|
})
|
|
|
|
t.Run("SyncWant", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("NewUnits", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := filepath.Join(tempDirUnixSocket(t), "test.sock")
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// If dependency units are not registered, they are registered automatically
|
|
err = client.SyncWant(ctx, "test-unit", "dependency-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err := client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Len(t, status.Dependencies, 1)
|
|
require.Equal(t, unit.ID("dependency-unit"), status.Dependencies[0].DependsOn)
|
|
require.Equal(t, unit.StatusComplete, status.Dependencies[0].RequiredStatus)
|
|
})
|
|
|
|
t.Run("DependencyAlreadyRegistered", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := filepath.Join(tempDirUnixSocket(t), "test.sock")
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// Start the dependency unit
|
|
err = client.SyncStart(ctx, "dependency-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err := client.SyncStatus(ctx, "dependency-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
|
|
// Add the dependency after the dependency unit has already started
|
|
err = client.SyncWant(ctx, "test-unit", "dependency-unit")
|
|
|
|
// Dependencies can be added even if the dependency unit has already started
|
|
require.NoError(t, err)
|
|
|
|
// The dependency is now reflected in the test unit's status
|
|
status, err = client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.ID("dependency-unit"), status.Dependencies[0].DependsOn)
|
|
require.Equal(t, unit.StatusComplete, status.Dependencies[0].RequiredStatus)
|
|
})
|
|
|
|
t.Run("DependencyAddedAfterDependentStarted", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := filepath.Join(tempDirUnixSocket(t), "test.sock")
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// Start the dependent unit
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err := client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
|
|
// Add the dependency after the dependency unit has already started
|
|
err = client.SyncWant(ctx, "test-unit", "dependency-unit")
|
|
|
|
// Dependencies can be added even if the dependent unit has already started.
|
|
// The dependency applies the next time a unit is started. The current status is not updated.
|
|
// This is to allow flexible dependency management. It does mean that users of this API should
|
|
// take care to add dependencies before they start their dependent units.
|
|
require.NoError(t, err)
|
|
|
|
// The dependency is now reflected in the test unit's status
|
|
status, err = client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.ID("dependency-unit"), status.Dependencies[0].DependsOn)
|
|
require.Equal(t, unit.StatusComplete, status.Dependencies[0].RequiredStatus)
|
|
})
|
|
})
|
|
|
|
t.Run("SyncReady", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("UnregisteredUnit", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := filepath.Join(tempDirUnixSocket(t), "test.sock")
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
ready, err := client.SyncReady(ctx, "unregistered-unit")
|
|
require.NoError(t, err)
|
|
require.True(t, ready)
|
|
})
|
|
|
|
t.Run("UnitNotReady", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := filepath.Join(tempDirUnixSocket(t), "test.sock")
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// Register a unit with an unsatisfied dependency
|
|
err = client.SyncWant(ctx, "test-unit", "dependency-unit")
|
|
require.NoError(t, err)
|
|
|
|
// Check readiness - should be false because dependency is not satisfied
|
|
ready, err := client.SyncReady(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.False(t, ready)
|
|
})
|
|
|
|
t.Run("UnitReady", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := filepath.Join(tempDirUnixSocket(t), "test.sock")
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// Register a unit with no dependencies - should be ready immediately
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
// Check readiness - should be true
|
|
ready, err := client.SyncReady(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.True(t, ready)
|
|
|
|
// Also test a unit with satisfied dependencies
|
|
err = client.SyncWant(ctx, "dependent-unit", "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
// Complete the dependency
|
|
err = client.SyncComplete(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
// Now dependent-unit should be ready
|
|
ready, err = client.SyncReady(ctx, "dependent-unit")
|
|
require.NoError(t, err)
|
|
require.True(t, ready)
|
|
})
|
|
})
|
|
}
|