mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
7cc2b22568
relates to #21335 Adds UpdateAppStatus on the agentsocket, wired up to forward to Coderd over the dRPC connection the agent maintains. Disclosure: I used AI to generate significant portions of this PR, but hand-reviewed and tweaked the code. I consider it approximately indistinguishable from what I would have done by hand.
492 lines
14 KiB
Go
492 lines
14 KiB
Go
package agentsocket_test
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/xerrors"
|
|
|
|
"cdr.dev/slog/v3"
|
|
"github.com/coder/coder/v2/agent/agentsocket"
|
|
agentproto "github.com/coder/coder/v2/agent/proto"
|
|
"github.com/coder/coder/v2/agent/unit"
|
|
"github.com/coder/coder/v2/testutil"
|
|
)
|
|
|
|
// fakeAgentAPI implements just the UpdateAppStatus method of
|
|
// DRPCAgentClient28 for testing. Calling any other method will panic.
|
|
type fakeAgentAPI struct {
|
|
agentproto.DRPCAgentClient28
|
|
updateAppStatus func(context.Context, *agentproto.UpdateAppStatusRequest) (*agentproto.UpdateAppStatusResponse, error)
|
|
}
|
|
|
|
func (m *fakeAgentAPI) UpdateAppStatus(ctx context.Context, req *agentproto.UpdateAppStatusRequest) (*agentproto.UpdateAppStatusResponse, error) {
|
|
return m.updateAppStatus(ctx, req)
|
|
}
|
|
|
|
// newSocketClient creates a DRPC client connected to the Unix socket at the given path.
|
|
func newSocketClient(ctx context.Context, t *testing.T, socketPath string) *agentsocket.Client {
|
|
t.Helper()
|
|
|
|
client, err := agentsocket.NewClient(ctx, agentsocket.WithPath(socketPath))
|
|
t.Cleanup(func() {
|
|
_ = client.Close()
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
return client
|
|
}
|
|
|
|
func TestDRPCAgentSocketService(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("Ping", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
err = client.Ping(ctx)
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("SyncStart", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("NewUnit", func(t *testing.T) {
|
|
t.Parallel()
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err := client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
})
|
|
|
|
t.Run("UnitAlreadyStarted", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// First Start
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
status, err := client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
|
|
// Second Start
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.ErrorContains(t, err, unit.ErrSameStatusAlreadySet.Error())
|
|
|
|
status, err = client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
})
|
|
|
|
t.Run("UnitAlreadyCompleted", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// First start
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err := client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
|
|
// Complete the unit
|
|
err = client.SyncComplete(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err = client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusComplete, status.Status)
|
|
|
|
// Second start
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err = client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
})
|
|
|
|
t.Run("UnitNotReady", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
err = client.SyncWant(ctx, "test-unit", "dependency-unit")
|
|
require.NoError(t, err)
|
|
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.ErrorContains(t, err, "unit not ready")
|
|
|
|
status, err := client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusPending, status.Status)
|
|
require.False(t, status.IsReady)
|
|
})
|
|
})
|
|
|
|
t.Run("SyncWant", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("NewUnits", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// If dependency units are not registered, they are registered automatically
|
|
err = client.SyncWant(ctx, "test-unit", "dependency-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err := client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Len(t, status.Dependencies, 1)
|
|
require.Equal(t, unit.ID("dependency-unit"), status.Dependencies[0].DependsOn)
|
|
require.Equal(t, unit.StatusComplete, status.Dependencies[0].RequiredStatus)
|
|
})
|
|
|
|
t.Run("DependencyAlreadyRegistered", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// Start the dependency unit
|
|
err = client.SyncStart(ctx, "dependency-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err := client.SyncStatus(ctx, "dependency-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
|
|
// Add the dependency after the dependency unit has already started
|
|
err = client.SyncWant(ctx, "test-unit", "dependency-unit")
|
|
|
|
// Dependencies can be added even if the dependency unit has already started
|
|
require.NoError(t, err)
|
|
|
|
// The dependency is now reflected in the test unit's status
|
|
status, err = client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.ID("dependency-unit"), status.Dependencies[0].DependsOn)
|
|
require.Equal(t, unit.StatusComplete, status.Dependencies[0].RequiredStatus)
|
|
})
|
|
|
|
t.Run("DependencyAddedAfterDependentStarted", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// Start the dependent unit
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
status, err := client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.StatusStarted, status.Status)
|
|
|
|
// Add the dependency after the dependency unit has already started
|
|
err = client.SyncWant(ctx, "test-unit", "dependency-unit")
|
|
|
|
// Dependencies can be added even if the dependent unit has already started.
|
|
// The dependency applies the next time a unit is started. The current status is not updated.
|
|
// This is to allow flexible dependency management. It does mean that users of this API should
|
|
// take care to add dependencies before they start their dependent units.
|
|
require.NoError(t, err)
|
|
|
|
// The dependency is now reflected in the test unit's status
|
|
status, err = client.SyncStatus(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.Equal(t, unit.ID("dependency-unit"), status.Dependencies[0].DependsOn)
|
|
require.Equal(t, unit.StatusComplete, status.Dependencies[0].RequiredStatus)
|
|
})
|
|
})
|
|
|
|
t.Run("SyncReady", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("UnregisteredUnit", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
ready, err := client.SyncReady(ctx, "unregistered-unit")
|
|
require.NoError(t, err)
|
|
require.True(t, ready)
|
|
})
|
|
|
|
t.Run("UnitNotReady", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// Register a unit with an unsatisfied dependency
|
|
err = client.SyncWant(ctx, "test-unit", "dependency-unit")
|
|
require.NoError(t, err)
|
|
|
|
// Check readiness - should be false because dependency is not satisfied
|
|
ready, err := client.SyncReady(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.False(t, ready)
|
|
})
|
|
|
|
t.Run("UnitReady", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
// Register a unit with no dependencies - should be ready immediately
|
|
err = client.SyncStart(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
// Check readiness - should be true
|
|
ready, err := client.SyncReady(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
require.True(t, ready)
|
|
|
|
// Also test a unit with satisfied dependencies
|
|
err = client.SyncWant(ctx, "dependent-unit", "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
// Complete the dependency
|
|
err = client.SyncComplete(ctx, "test-unit")
|
|
require.NoError(t, err)
|
|
|
|
// Now dependent-unit should be ready
|
|
ready, err = client.SyncReady(ctx, "dependent-unit")
|
|
require.NoError(t, err)
|
|
require.True(t, ready)
|
|
})
|
|
})
|
|
|
|
t.Run("UpdateAppStatus", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("NotConnected", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
_, err = client.UpdateAppStatus(ctx, &agentproto.UpdateAppStatusRequest{
|
|
Slug: "test-app",
|
|
State: agentproto.UpdateAppStatusRequest_WORKING,
|
|
Message: "doing stuff",
|
|
})
|
|
require.ErrorContains(t, err, "not connected")
|
|
})
|
|
|
|
t.Run("ForwardsToAgentAPI", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
var gotReq *agentproto.UpdateAppStatusRequest
|
|
mock := &fakeAgentAPI{
|
|
updateAppStatus: func(_ context.Context, req *agentproto.UpdateAppStatusRequest) (*agentproto.UpdateAppStatusResponse, error) {
|
|
gotReq = req
|
|
return &agentproto.UpdateAppStatusResponse{}, nil
|
|
},
|
|
}
|
|
server.SetAgentAPI(mock)
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
resp, err := client.UpdateAppStatus(ctx, &agentproto.UpdateAppStatusRequest{
|
|
Slug: "test-app",
|
|
State: agentproto.UpdateAppStatusRequest_IDLE,
|
|
Message: "all done",
|
|
Uri: "https://example.com",
|
|
})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, resp)
|
|
|
|
require.NotNil(t, gotReq)
|
|
require.Equal(t, "test-app", gotReq.Slug)
|
|
require.Equal(t, agentproto.UpdateAppStatusRequest_IDLE, gotReq.State)
|
|
require.Equal(t, "all done", gotReq.Message)
|
|
require.Equal(t, "https://example.com", gotReq.Uri)
|
|
})
|
|
|
|
t.Run("ForwardsError", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
mock := &fakeAgentAPI{
|
|
updateAppStatus: func(context.Context, *agentproto.UpdateAppStatusRequest) (*agentproto.UpdateAppStatusResponse, error) {
|
|
return nil, xerrors.New("app not found")
|
|
},
|
|
}
|
|
server.SetAgentAPI(mock)
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
_, err = client.UpdateAppStatus(ctx, &agentproto.UpdateAppStatusRequest{
|
|
Slug: "nonexistent",
|
|
State: agentproto.UpdateAppStatusRequest_WORKING,
|
|
Message: "testing",
|
|
})
|
|
require.ErrorContains(t, err, "app not found")
|
|
})
|
|
|
|
t.Run("ClearAgentAPI", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath := testutil.AgentSocketPath(t)
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
server, err := agentsocket.NewServer(
|
|
slog.Make().Leveled(slog.LevelDebug),
|
|
agentsocket.WithPath(socketPath),
|
|
)
|
|
require.NoError(t, err)
|
|
defer server.Close()
|
|
|
|
mock := &fakeAgentAPI{
|
|
updateAppStatus: func(context.Context, *agentproto.UpdateAppStatusRequest) (*agentproto.UpdateAppStatusResponse, error) {
|
|
return &agentproto.UpdateAppStatusResponse{}, nil
|
|
},
|
|
}
|
|
server.SetAgentAPI(mock)
|
|
server.ClearAgentAPI()
|
|
|
|
client := newSocketClient(ctx, t, socketPath)
|
|
|
|
_, err = client.UpdateAppStatus(ctx, &agentproto.UpdateAppStatusRequest{
|
|
Slug: "test-app",
|
|
State: agentproto.UpdateAppStatusRequest_WORKING,
|
|
Message: "should fail",
|
|
})
|
|
require.ErrorContains(t, err, "not connected")
|
|
})
|
|
})
|
|
}
|