chore: support agent updates in tunneler (#23730)

<!--

If you have used AI to produce some or all of this PR, please ensure you have read our [AI Contribution guidelines](https://coder.com/docs/about/contributing/AI_CONTRIBUTING) before submitting.

-->

relates to GRU-18

Adds support for agent updates to the Tunneler
This commit is contained in:
Spike Curtis
2026-03-30 13:50:06 -04:00
committed by GitHub
parent 3cc31de57a
commit ef3aade647
2 changed files with 257 additions and 70 deletions
+109 -30
View File
@@ -31,17 +31,35 @@ type WorkspaceStarter interface {
StartWorkspace() error
}
type Client interface {
DialAgent(dialCtx context.Context, agentID uuid.UUID, options *workspacesdk.DialAgentOptions) (workspacesdk.AgentConn, error)
}
const (
// stateInit is the initial state of the FSM.
stateInit state = iota
// exit is the final state of the FSM, and implies that everything is closed or closing.
exit
// waitToStart means the workspace is in a state where we have to wait before we can create a new start build
waitToStart
// waitForWorkspaceStarted means the workspace is starting, or we have kicked off a goroutine to start it
waitForWorkspaceStarted
// waitForAgent means the workspace has started and we are waiting for the agent to connect or be ready
waitForAgent
// establishTailnet means we have kicked off a goroutine to dial the agent and are waiting for its results
establishTailnet
// tailnetUp means the tailnet connection came up and we kicked off a goroutine to start the NetworkedApplication.
tailnetUp
// applicationUp means the NetworkedApplication is up.
applicationUp
// shutdownApplication means we are in graceful shut down and waiting for the NetworkedApplication. It could be
// starting or closing, and we expect to get a networkedApplicationUpdate event when it does.
shutdownApplication
// shutdownTailnet means that we are in graceful shut down and waiting for the tailnet. This implies the
// NetworkedApplication is status is down. E.g. closed or was never started.
shutdownTailnet
// maxState is not a valid state for the FSM, and must be last in this list. It allows tests to iterate over all
// valid states using `range maxState`.
maxState // used for testing
)
@@ -49,7 +67,7 @@ type Tunneler struct {
config Config
ctx context.Context
cancel context.CancelFunc
client *workspacesdk.Client
client Client
state state
agentConn workspacesdk.AgentConn
events chan tunnelerEvent
@@ -98,22 +116,24 @@ type buildUpdate struct {
}
type agentUpdate struct {
// TODO: commented out to appease linter
// transition codersdk.WorkspaceTransition
// id uuid.UUID
lifecycle codersdk.WorkspaceAgentLifecycle
id uuid.UUID
}
type networkedApplicationUpdate struct {
// up is true if the application is up. False if it is down.
up bool
up bool
err error
}
type tailnetUpdate struct {
// up is true if the tailnet is up. False if it is down.
up bool
up bool
conn workspacesdk.AgentConn
err error
}
func NewTunneler(client *workspacesdk.Client, config Config) *Tunneler {
func NewTunneler(client Client, config Config) *Tunneler {
t := &Tunneler{
config: config,
client: client,
@@ -166,13 +186,17 @@ func (t *Tunneler) handleSignal() {
switch t.state {
case exit, shutdownTailnet, shutdownApplication:
return
case tailnetUp, applicationUp:
case applicationUp:
t.wg.Add(1)
go t.closeApp()
t.state = shutdownApplication
case tailnetUp:
// waiting for app to start; setting state here will cause us to tear it down when the app start goroutine
// event comes in.
t.state = shutdownApplication
case establishTailnet:
t.wg.Add(1)
go t.shutdownTailnet()
// waiting for tailnet to start; setting state here will cause us to tear it down when the tailnet dial
// goroutine event comes in.
t.state = shutdownTailnet
case stateInit, waitToStart, waitForWorkspaceStarted, waitForAgent:
t.cancel() // stops the watch
@@ -212,13 +236,12 @@ func (t *Tunneler) handleBuildUpdate(update *buildUpdate) {
if update.transition == codersdk.WorkspaceTransitionStart && canMakeProgress {
t.config.DebugLogger.Debug(t.ctx, "workspace is starting", slog.F("job_status", update.jobStatus))
switch t.state {
case establishTailnet:
// new build after we're already connecting
t.wg.Add(1)
go t.shutdownTailnet()
// new build after we have already connected
case establishTailnet: // we are starting the tailnet
t.state = shutdownTailnet
case applicationUp, tailnetUp:
// new build after we have already connected
case tailnetUp: // we are starting the application
t.state = shutdownApplication
case applicationUp:
t.wg.Add(1)
go t.closeApp()
t.state = shutdownApplication
@@ -241,14 +264,14 @@ func (t *Tunneler) handleBuildUpdate(update *buildUpdate) {
if update.transition == codersdk.WorkspaceTransitionStop {
// these cases take effect regardless of whether the transition is complete or not
switch t.state {
case establishTailnet:
// new build after we're already connecting
t.wg.Add(1)
go t.shutdownTailnet()
// all 3 of these mean a new build after we have already started connecting
case establishTailnet: // waiting for tailnet to start
t.state = shutdownTailnet
return
case applicationUp, tailnetUp:
// new build after we have already connected
case tailnetUp: // waiting for application to start
t.state = shutdownApplication
return
case applicationUp:
t.wg.Add(1)
go t.closeApp()
t.state = shutdownApplication
@@ -289,7 +312,39 @@ func (t *Tunneler) handleBuildUpdate(update *buildUpdate) {
func (*Tunneler) handleProvisionerJobLog(*codersdk.ProvisionerJobLog) {
}
func (*Tunneler) handleAgentUpdate(*agentUpdate) {
func (t *Tunneler) handleAgentUpdate(update *agentUpdate) {
if t.state != waitForAgent {
return
}
doConnect := func() {
t.wg.Add(1)
t.state = establishTailnet
go t.connectTailnet(update.id)
}
// consequence of ignoring updates if we are not waiting for the agent is that we MUST receive
// the start build succeeded update BEFORE we get the Agent connected / ready update. We should keep this
// in mind when implementing the watch in Coderd.
switch update.lifecycle {
case codersdk.WorkspaceAgentLifecycleReady:
doConnect()
return
case codersdk.WorkspaceAgentLifecycleStarting,
codersdk.WorkspaceAgentLifecycleStartError,
codersdk.WorkspaceAgentLifecycleStartTimeout:
if t.config.NoWaitForScripts {
doConnect()
return
}
case codersdk.WorkspaceAgentLifecycleShuttingDown:
case codersdk.WorkspaceAgentLifecycleShutdownError:
case codersdk.WorkspaceAgentLifecycleShutdownTimeout:
case codersdk.WorkspaceAgentLifecycleOff:
case codersdk.WorkspaceAgentLifecycleCreated: // initial state, so it hasn't connected yet
default:
// unhittable, unless new states are added. We structure this with the switch and all cases covered to ensure
// we cover all cases.
t.config.DebugLogger.Critical(t.ctx, "unhandled agent update", slog.F("lifecycle", update.lifecycle))
}
}
func (*Tunneler) handleAgentLog(*codersdk.WorkspaceAgentLog) {
@@ -310,7 +365,7 @@ func (t *Tunneler) closeApp() {
select {
case <-t.ctx.Done():
t.config.DebugLogger.Info(t.ctx, "context expired before sending app down")
case t.events <- tunnelerEvent{appUpdate: &networkedApplicationUpdate{up: false}}:
case t.events <- tunnelerEvent{appUpdate: &networkedApplicationUpdate{up: false, err: err}}:
}
}
@@ -325,20 +380,44 @@ func (t *Tunneler) startWorkspace() {
select {
case <-t.ctx.Done():
t.config.DebugLogger.Info(t.ctx, "context expired before sending signal after failed workspace start")
case t.events <- tunnelerEvent{shutdownSignal: &shutdownSignal{}}:
case t.events <- tunnelerEvent{appUpdate: &networkedApplicationUpdate{up: false}}:
}
}
}
func (t *Tunneler) shutdownTailnet() {
func (t *Tunneler) connectTailnet(id uuid.UUID) {
defer t.wg.Done()
err := t.agentConn.Close()
conn, err := t.client.DialAgent(t.ctx, id, &workspacesdk.DialAgentOptions{
Logger: t.config.DebugLogger.Named("dialer"),
})
if err != nil {
t.config.DebugLogger.Error(t.ctx, "failed to close agent connection", slog.Error(err))
t.config.DebugLogger.Error(t.ctx, "failed to connect agent", slog.Error(err))
if t.config.LogWriter != nil {
_, _ = fmt.Fprintf(t.config.LogWriter, "failed to dial workspace agent: %s", err.Error())
}
select {
case <-t.ctx.Done():
t.config.DebugLogger.Info(t.ctx, "context expired before sending event after failed agent dial")
case t.events <- tunnelerEvent{tailnetUpdate: &tailnetUpdate{up: false, err: err}}:
}
}
select {
case <-t.ctx.Done():
t.config.DebugLogger.Debug(t.ctx, "context expired before sending event after shutting down tailnet")
case t.events <- tunnelerEvent{tailnetUpdate: &tailnetUpdate{up: false}}:
t.config.DebugLogger.Info(t.ctx, "context expired before sending tailnet conn")
case t.events <- tunnelerEvent{tailnetUpdate: &tailnetUpdate{up: true, conn: conn}}:
}
}
// TODO: Restore this func when we implement tearing down the tailnet
// func (t *Tunneler) shutdownTailnet() {
// defer t.wg.Done()
// err := t.agentConn.Close()
// if err != nil {
// t.config.DebugLogger.Error(t.ctx, "failed to close agent connection", slog.Error(err))
// }
// select {
// case <-t.ctx.Done():
// t.config.DebugLogger.Debug(t.ctx, "context expired before sending event after shutting down tailnet")
// case t.events <- tunnelerEvent{tailnetUpdate: &tailnetUpdate{up: false, err: err}}:
// }
//}
@@ -27,46 +27,9 @@ func TestHandleBuildUpdate_Coverage(t *testing.T) {
for _, noWaitForScripts := range []bool{true, false} {
t.Run(fmt.Sprintf("%d_%s_%s_%t_%t", s, trans, jobStatus, noAutostart, noWaitForScripts), func(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
mAgentConn := agentconnmock.NewMockAgentConn(ctrl)
logger := testutil.Logger(t)
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
uut := &Tunneler{
config: Config{
WorkspaceID: workspaceID,
App: fakeApp{},
WorkspaceStarter: &fakeWorkspaceStarter{},
AgentName: "test",
NoAutostart: noAutostart,
NoWaitForScripts: noWaitForScripts,
DebugLogger: logger.Named("tunneler"),
},
events: make(chan tunnelerEvent),
ctx: ctx,
cancel: cancel,
state: s,
agentConn: mAgentConn,
}
mAgentConn.EXPECT().Close().Return(nil).AnyTimes()
uut.handleBuildUpdate(&buildUpdate{transition: trans, jobStatus: jobStatus})
done := make(chan struct{})
go func() {
defer close(done)
uut.wg.Wait()
}()
cancel() // cancel in case the update triggers a go routine that writes another event
// ensure we don't leak a go routine
_ = testutil.TryReceive(testCtx, t, done)
// We're not asserting the resulting state, as there are just too many to directly enumerate
// due to the combinations. Unhandled cases will hit a critical log in the handler and fail
// the test.
require.Less(t, uut.state, maxState)
require.GreaterOrEqual(t, uut.state, 0)
coverUpdate(t, workspaceID, noAutostart, noWaitForScripts, s, func(uut *Tunneler) {
uut.handleBuildUpdate(&buildUpdate{transition: trans, jobStatus: jobStatus})
})
})
}
}
@@ -75,6 +38,51 @@ func TestHandleBuildUpdate_Coverage(t *testing.T) {
}
}
func coverUpdate(t *testing.T, workspaceID uuid.UUID, noAutostart bool, noWaitForScripts bool, s state, update func(uut *Tunneler)) {
ctrl := gomock.NewController(t)
mAgentConn := agentconnmock.NewMockAgentConn(ctrl)
logger := testutil.Logger(t)
fClient := &fakeClient{conn: mAgentConn}
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
uut := &Tunneler{
client: fClient,
config: Config{
WorkspaceID: workspaceID,
App: fakeApp{},
WorkspaceStarter: &fakeWorkspaceStarter{},
AgentName: "test",
NoAutostart: noAutostart,
NoWaitForScripts: noWaitForScripts,
DebugLogger: logger.Named("tunneler"),
},
events: make(chan tunnelerEvent),
ctx: ctx,
cancel: cancel,
state: s,
agentConn: mAgentConn,
}
mAgentConn.EXPECT().Close().Return(nil).AnyTimes()
update(uut)
done := make(chan struct{})
go func() {
defer close(done)
uut.wg.Wait()
}()
cancel() // cancel in case the update triggers a go routine that writes another event
// ensure we don't leak a go routine
_ = testutil.TryReceive(testCtx, t, done)
// We're not asserting the resulting state, as there are just too many to directly enumerate
// due to the combinations. Unhandled cases will hit a critical log in the handler and fail
// the test.
require.Less(t, uut.state, maxState)
require.GreaterOrEqual(t, uut.state, 0)
}
func TestBuildUpdatesStoppedWorkspace(t *testing.T) {
t.Parallel()
workspaceID := uuid.UUID{1}
@@ -234,6 +242,96 @@ func TestBuildUpdatesNoAutostart(t *testing.T) {
require.Error(t, ctx.Err())
}
func TestAgentUpdate_Coverage(t *testing.T) {
t.Parallel()
workspaceID := uuid.UUID{1}
agentID := uuid.UUID{2}
for s := range maxState {
for _, lifecycle := range codersdk.WorkspaceAgentLifecycleOrder {
for _, noAutostart := range []bool{true, false} {
for _, noWaitForScripts := range []bool{true, false} {
t.Run(fmt.Sprintf("%d_%s_%t_%t", s, lifecycle, noAutostart, noWaitForScripts), func(t *testing.T) {
t.Parallel()
coverUpdate(t, workspaceID, noAutostart, noWaitForScripts, s, func(uut *Tunneler) {
uut.handleAgentUpdate(&agentUpdate{lifecycle: lifecycle, id: agentID})
})
})
}
}
}
}
}
func TestAgentUpdateReady(t *testing.T) {
t.Parallel()
workspaceID := uuid.UUID{1}
agentID := uuid.UUID{2}
logger := testutil.Logger(t)
ctrl := gomock.NewController(t)
mAgentConn := agentconnmock.NewMockAgentConn(ctrl)
fClient := &fakeClient{conn: mAgentConn}
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
uut := &Tunneler{
config: Config{
WorkspaceID: workspaceID,
AgentName: "test",
DebugLogger: logger.Named("tunneler"),
},
events: make(chan tunnelerEvent),
ctx: ctx,
cancel: cancel,
state: waitForAgent,
client: fClient,
}
uut.handleAgentUpdate(&agentUpdate{lifecycle: codersdk.WorkspaceAgentLifecycleReady, id: agentID})
require.Equal(t, establishTailnet, uut.state)
event := testutil.RequireReceive(testCtx, t, uut.events)
require.NotNil(t, event.tailnetUpdate)
require.True(t, fClient.dialed)
require.Equal(t, mAgentConn, event.tailnetUpdate.conn)
require.True(t, event.tailnetUpdate.up)
}
func TestAgentUpdateNoWait(t *testing.T) {
t.Parallel()
workspaceID := uuid.UUID{1}
agentID := uuid.UUID{2}
logger := testutil.Logger(t)
ctrl := gomock.NewController(t)
mAgentConn := agentconnmock.NewMockAgentConn(ctrl)
fClient := &fakeClient{conn: mAgentConn}
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
uut := &Tunneler{
config: Config{
WorkspaceID: workspaceID,
AgentName: "test",
DebugLogger: logger.Named("tunneler"),
NoWaitForScripts: true,
},
events: make(chan tunnelerEvent),
ctx: ctx,
cancel: cancel,
state: waitForAgent,
client: fClient,
}
uut.handleAgentUpdate(&agentUpdate{lifecycle: codersdk.WorkspaceAgentLifecycleStarting, id: agentID})
require.Equal(t, establishTailnet, uut.state)
event := testutil.RequireReceive(testCtx, t, uut.events)
require.NotNil(t, event.tailnetUpdate)
require.True(t, fClient.dialed)
require.Equal(t, mAgentConn, event.tailnetUpdate.conn)
require.True(t, event.tailnetUpdate.up)
}
func waitForGoroutines(ctx context.Context, t *testing.T, tunneler *Tunneler) {
done := make(chan struct{})
go func() {
@@ -259,3 +357,13 @@ func (fakeApp) Close() error {
}
func (fakeApp) Start(workspacesdk.AgentConn) {}
type fakeClient struct {
conn workspacesdk.AgentConn
dialed bool
}
func (f *fakeClient) DialAgent(context.Context, uuid.UUID, *workspacesdk.DialAgentOptions) (workspacesdk.AgentConn, error) {
f.dialed = true
return f.conn, nil
}