mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
refactor: convert workspacesdk.AgentConn to an interface (#19392)
Fixes https://github.com/coder/internal/issues/907 We convert `workspacesdk.AgentConn` to an interface and generate a mock for it. This allows writing `coderd` tests that rely on the agent's HTTP api to not have to set up an entire tailnet networking stack.
This commit is contained in:
+3
-3
@@ -325,6 +325,9 @@ func New(options *Options) *API {
|
||||
})
|
||||
}
|
||||
|
||||
if options.PrometheusRegistry == nil {
|
||||
options.PrometheusRegistry = prometheus.NewRegistry()
|
||||
}
|
||||
if options.Authorizer == nil {
|
||||
options.Authorizer = rbac.NewCachingAuthorizer(options.PrometheusRegistry)
|
||||
if buildinfo.IsDev() {
|
||||
@@ -381,9 +384,6 @@ func New(options *Options) *API {
|
||||
if options.FilesRateLimit == 0 {
|
||||
options.FilesRateLimit = 12
|
||||
}
|
||||
if options.PrometheusRegistry == nil {
|
||||
options.PrometheusRegistry = prometheus.NewRegistry()
|
||||
}
|
||||
if options.Clock == nil {
|
||||
options.Clock = quartz.NewReal()
|
||||
}
|
||||
|
||||
+2
-2
@@ -277,9 +277,9 @@ func (s *ServerTailnet) dialContext(ctx context.Context, network, addr string) (
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *ServerTailnet) AgentConn(ctx context.Context, agentID uuid.UUID) (*workspacesdk.AgentConn, func(), error) {
|
||||
func (s *ServerTailnet) AgentConn(ctx context.Context, agentID uuid.UUID) (workspacesdk.AgentConn, func(), error) {
|
||||
var (
|
||||
conn *workspacesdk.AgentConn
|
||||
conn workspacesdk.AgentConn
|
||||
ret func()
|
||||
)
|
||||
|
||||
|
||||
@@ -0,0 +1,186 @@
|
||||
package coderd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbmock"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||
"github.com/coder/coder/v2/coderd/httpmw"
|
||||
"github.com/coder/coder/v2/coderd/workspaceapps/appurl"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/codersdk/workspacesdk"
|
||||
"github.com/coder/coder/v2/codersdk/workspacesdk/agentconnmock"
|
||||
"github.com/coder/coder/v2/codersdk/wsjson"
|
||||
"github.com/coder/coder/v2/tailnet"
|
||||
"github.com/coder/coder/v2/tailnet/tailnettest"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
"github.com/coder/websocket"
|
||||
)
|
||||
|
||||
type fakeAgentProvider struct {
|
||||
agentConn func(ctx context.Context, agentID uuid.UUID) (_ workspacesdk.AgentConn, release func(), _ error)
|
||||
}
|
||||
|
||||
func (fakeAgentProvider) ReverseProxy(targetURL, dashboardURL *url.URL, agentID uuid.UUID, app appurl.ApplicationURL, wildcardHost string) *httputil.ReverseProxy {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func (f fakeAgentProvider) AgentConn(ctx context.Context, agentID uuid.UUID) (_ workspacesdk.AgentConn, release func(), _ error) {
|
||||
if f.agentConn != nil {
|
||||
return f.agentConn(ctx, agentID)
|
||||
}
|
||||
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func (fakeAgentProvider) ServeHTTPDebug(w http.ResponseWriter, r *http.Request) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func (fakeAgentProvider) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestWatchAgentContainers(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("WebSocketClosesProperly", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// This test ensures that the agent containers `/watch` websocket can gracefully
|
||||
// handle the underlying websocket unexpectedly closing. This test was created in
|
||||
// response to this issue: https://github.com/coder/coder/issues/19372
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitShort)
|
||||
logger = slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug).Named("coderd")
|
||||
|
||||
mCtrl = gomock.NewController(t)
|
||||
mDB = dbmock.NewMockStore(mCtrl)
|
||||
mCoordinator = tailnettest.NewMockCoordinator(mCtrl)
|
||||
mAgentConn = agentconnmock.NewMockAgentConn(mCtrl)
|
||||
|
||||
fAgentProvider = fakeAgentProvider{
|
||||
agentConn: func(ctx context.Context, agentID uuid.UUID) (_ workspacesdk.AgentConn, release func(), _ error) {
|
||||
return mAgentConn, func() {}, nil
|
||||
},
|
||||
}
|
||||
|
||||
workspaceID = uuid.New()
|
||||
agentID = uuid.New()
|
||||
resourceID = uuid.New()
|
||||
jobID = uuid.New()
|
||||
buildID = uuid.New()
|
||||
|
||||
containersCh = make(chan codersdk.WorkspaceAgentListContainersResponse)
|
||||
|
||||
r = chi.NewMux()
|
||||
|
||||
api = API{
|
||||
ctx: ctx,
|
||||
Options: &Options{
|
||||
AgentInactiveDisconnectTimeout: testutil.WaitShort,
|
||||
Database: mDB,
|
||||
Logger: logger,
|
||||
DeploymentValues: &codersdk.DeploymentValues{},
|
||||
TailnetCoordinator: tailnettest.NewFakeCoordinator(),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
var tailnetCoordinator tailnet.Coordinator = mCoordinator
|
||||
api.TailnetCoordinator.Store(&tailnetCoordinator)
|
||||
api.agentProvider = fAgentProvider
|
||||
|
||||
// Setup: Allow `ExtractWorkspaceAgentParams` to complete.
|
||||
mDB.EXPECT().GetWorkspaceAgentByID(gomock.Any(), agentID).Return(database.WorkspaceAgent{
|
||||
ID: agentID,
|
||||
ResourceID: resourceID,
|
||||
LifecycleState: database.WorkspaceAgentLifecycleStateReady,
|
||||
FirstConnectedAt: sql.NullTime{Valid: true, Time: dbtime.Now()},
|
||||
LastConnectedAt: sql.NullTime{Valid: true, Time: dbtime.Now()},
|
||||
}, nil)
|
||||
mDB.EXPECT().GetWorkspaceResourceByID(gomock.Any(), resourceID).Return(database.WorkspaceResource{
|
||||
ID: resourceID,
|
||||
JobID: jobID,
|
||||
}, nil)
|
||||
mDB.EXPECT().GetProvisionerJobByID(gomock.Any(), jobID).Return(database.ProvisionerJob{
|
||||
ID: jobID,
|
||||
Type: database.ProvisionerJobTypeWorkspaceBuild,
|
||||
}, nil)
|
||||
mDB.EXPECT().GetWorkspaceBuildByJobID(gomock.Any(), jobID).Return(database.WorkspaceBuild{
|
||||
WorkspaceID: workspaceID,
|
||||
ID: buildID,
|
||||
}, nil)
|
||||
|
||||
// And: Allow `db2dsk.WorkspaceAgent` to complete.
|
||||
mCoordinator.EXPECT().Node(gomock.Any()).Return(nil)
|
||||
|
||||
// And: Allow `WatchContainers` to be called, returing our `containersCh` channel.
|
||||
mAgentConn.EXPECT().WatchContainers(gomock.Any(), gomock.Any()).
|
||||
Return(containersCh, io.NopCloser(&bytes.Buffer{}), nil)
|
||||
|
||||
// And: We mount the HTTP Handler
|
||||
r.With(httpmw.ExtractWorkspaceAgentParam(mDB)).
|
||||
Get("/workspaceagents/{workspaceagent}/containers/watch", api.watchWorkspaceAgentContainers)
|
||||
|
||||
// Given: We create the HTTP server
|
||||
srv := httptest.NewServer(r)
|
||||
defer srv.Close()
|
||||
|
||||
// And: Dial the WebSocket
|
||||
wsURL := strings.Replace(srv.URL, "http://", "ws://", 1)
|
||||
conn, resp, err := websocket.Dial(ctx, fmt.Sprintf("%s/workspaceagents/%s/containers/watch", wsURL, agentID), nil)
|
||||
require.NoError(t, err)
|
||||
if resp.Body != nil {
|
||||
defer resp.Body.Close()
|
||||
}
|
||||
|
||||
// And: Create a streaming decoder
|
||||
decoder := wsjson.NewDecoder[codersdk.WorkspaceAgentListContainersResponse](conn, websocket.MessageText, logger)
|
||||
defer decoder.Close()
|
||||
decodeCh := decoder.Chan()
|
||||
|
||||
// And: We can successfully send through the channel.
|
||||
testutil.RequireSend(ctx, t, containersCh, codersdk.WorkspaceAgentListContainersResponse{
|
||||
Containers: []codersdk.WorkspaceAgentContainer{{
|
||||
ID: "test-container-id",
|
||||
}},
|
||||
})
|
||||
|
||||
// And: Receive the data.
|
||||
containerResp := testutil.RequireReceive(ctx, t, decodeCh)
|
||||
require.Len(t, containerResp.Containers, 1)
|
||||
require.Equal(t, "test-container-id", containerResp.Containers[0].ID)
|
||||
|
||||
// When: We close the `containersCh`
|
||||
close(containersCh)
|
||||
|
||||
// Then: We expect `decodeCh` to be closed.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fail()
|
||||
|
||||
case _, ok := <-decodeCh:
|
||||
require.False(t, ok, "channel is expected to be closed")
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -593,7 +593,7 @@ func TestWorkspaceAgentTailnet(t *testing.T) {
|
||||
_ = agenttest.New(t, client.URL, r.AgentToken)
|
||||
resources := coderdtest.AwaitWorkspaceAgents(t, client, r.Workspace.ID)
|
||||
|
||||
conn, err := func() (*workspacesdk.AgentConn, error) {
|
||||
conn, err := func() (workspacesdk.AgentConn, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel() // Connection should remain open even if the dial context is canceled.
|
||||
|
||||
@@ -1574,82 +1574,6 @@ func TestWatchWorkspaceAgentDevcontainers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("PayloadTooLarge", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitSuperLong)
|
||||
logger = slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
|
||||
mClock = quartz.NewMock(t)
|
||||
updaterTickerTrap = mClock.Trap().TickerFunc("updaterLoop")
|
||||
mCtrl = gomock.NewController(t)
|
||||
mCCLI = acmock.NewMockContainerCLI(mCtrl)
|
||||
|
||||
client, db = coderdtest.NewWithDatabase(t, &coderdtest.Options{Logger: &logger})
|
||||
user = coderdtest.CreateFirstUser(t, client)
|
||||
r = dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
|
||||
OrganizationID: user.OrganizationID,
|
||||
OwnerID: user.UserID,
|
||||
}).WithAgent(func(agents []*proto.Agent) []*proto.Agent {
|
||||
return agents
|
||||
}).Do()
|
||||
)
|
||||
|
||||
// WebSocket limit is 4MiB, so we want to ensure we create _more_ than 4MiB worth of payload.
|
||||
// Creating 20,000 fake containers creates a payload of roughly 7MiB.
|
||||
var fakeContainers []codersdk.WorkspaceAgentContainer
|
||||
for range 20_000 {
|
||||
fakeContainers = append(fakeContainers, codersdk.WorkspaceAgentContainer{
|
||||
CreatedAt: time.Now(),
|
||||
ID: uuid.NewString(),
|
||||
FriendlyName: uuid.NewString(),
|
||||
Image: "busybox:latest",
|
||||
Labels: map[string]string{
|
||||
agentcontainers.DevcontainerLocalFolderLabel: "/home/coder/project",
|
||||
agentcontainers.DevcontainerConfigFileLabel: "/home/coder/project/.devcontainer/devcontainer.json",
|
||||
},
|
||||
Running: false,
|
||||
Ports: []codersdk.WorkspaceAgentContainerPort{},
|
||||
Status: string(codersdk.WorkspaceAgentDevcontainerStatusRunning),
|
||||
Volumes: map[string]string{},
|
||||
})
|
||||
}
|
||||
|
||||
mCCLI.EXPECT().List(gomock.Any()).Return(codersdk.WorkspaceAgentListContainersResponse{Containers: fakeContainers}, nil)
|
||||
mCCLI.EXPECT().DetectArchitecture(gomock.Any(), gomock.Any()).Return("<none>", nil).AnyTimes()
|
||||
|
||||
_ = agenttest.New(t, client.URL, r.AgentToken, func(o *agent.Options) {
|
||||
o.Logger = logger.Named("agent")
|
||||
o.Devcontainers = true
|
||||
o.DevcontainerAPIOptions = []agentcontainers.Option{
|
||||
agentcontainers.WithClock(mClock),
|
||||
agentcontainers.WithContainerCLI(mCCLI),
|
||||
agentcontainers.WithWatcher(watcher.NewNoop()),
|
||||
}
|
||||
})
|
||||
|
||||
resources := coderdtest.NewWorkspaceAgentWaiter(t, client, r.Workspace.ID).Wait()
|
||||
require.Len(t, resources, 1, "expected one resource")
|
||||
require.Len(t, resources[0].Agents, 1, "expected one agent")
|
||||
agentID := resources[0].Agents[0].ID
|
||||
|
||||
updaterTickerTrap.MustWait(ctx).MustRelease(ctx)
|
||||
defer updaterTickerTrap.Close()
|
||||
|
||||
containers, closer, err := client.WatchWorkspaceAgentContainers(ctx, agentID)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
closer.Close()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fail()
|
||||
case _, ok := <-containers:
|
||||
require.False(t, ok)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestWorkspaceAgentRecreateDevcontainer(t *testing.T) {
|
||||
@@ -2497,7 +2421,7 @@ func TestWorkspaceAgent_UpdatedDERP(t *testing.T) {
|
||||
agentID := resources[0].Agents[0].ID
|
||||
|
||||
// Connect from a client.
|
||||
conn1, err := func() (*workspacesdk.AgentConn, error) {
|
||||
conn1, err := func() (workspacesdk.AgentConn, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel() // Connection should remain open even if the dial context is canceled.
|
||||
|
||||
@@ -2538,7 +2462,7 @@ func TestWorkspaceAgent_UpdatedDERP(t *testing.T) {
|
||||
|
||||
// Wait for the DERP map to be updated on the existing client.
|
||||
require.Eventually(t, func() bool {
|
||||
regionIDs := conn1.Conn.DERPMap().RegionIDs()
|
||||
regionIDs := conn1.TailnetConn().DERPMap().RegionIDs()
|
||||
return len(regionIDs) == 1 && regionIDs[0] == 2
|
||||
}, testutil.WaitLong, testutil.IntervalFast)
|
||||
|
||||
@@ -2555,7 +2479,7 @@ func TestWorkspaceAgent_UpdatedDERP(t *testing.T) {
|
||||
defer conn2.Close()
|
||||
ok = conn2.AwaitReachable(ctx)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, []int{2}, conn2.DERPMap().RegionIDs())
|
||||
require.Equal(t, []int{2}, conn2.TailnetConn().DERPMap().RegionIDs())
|
||||
}
|
||||
|
||||
func TestWorkspaceAgentExternalAuthListen(t *testing.T) {
|
||||
|
||||
@@ -74,7 +74,7 @@ type AgentProvider interface {
|
||||
ReverseProxy(targetURL, dashboardURL *url.URL, agentID uuid.UUID, app appurl.ApplicationURL, wildcardHost string) *httputil.ReverseProxy
|
||||
|
||||
// AgentConn returns a new connection to the specified agent.
|
||||
AgentConn(ctx context.Context, agentID uuid.UUID) (_ *workspacesdk.AgentConn, release func(), _ error)
|
||||
AgentConn(ctx context.Context, agentID uuid.UUID) (_ workspacesdk.AgentConn, release func(), _ error)
|
||||
|
||||
ServeHTTPDebug(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user