chore: add Tunneler FSM and partial impl (#23691)

<!--

If you have used AI to produce some or all of this PR, please ensure you
have read our [AI Contribution
guidelines](https://coder.com/docs/about/contributing/AI_CONTRIBUTING)
before submitting.

-->

Adds the Tunneler state machine and logic for handling build updates.   
  
This is a partial implementation and tests. Further PRs will fill out
the other event types.
  
Relates to GRU-18
This commit is contained in:
Spike Curtis
2026-03-27 08:52:13 -04:00
committed by GitHub
parent 9e33035631
commit 9b4d15db9b
3 changed files with 609 additions and 0 deletions
+4
View File
@@ -19,6 +19,10 @@ const (
WorkspaceTransitionDelete WorkspaceTransition = "delete" WorkspaceTransitionDelete WorkspaceTransition = "delete"
) )
func WorkspaceTransitionEnums() []WorkspaceTransition {
return []WorkspaceTransition{WorkspaceTransitionStart, WorkspaceTransitionStop, WorkspaceTransitionDelete}
}
type WorkspaceStatus string type WorkspaceStatus string
const ( const (
+344
View File
@@ -0,0 +1,344 @@
package tunneler
import (
"context"
"fmt"
"io"
"sync"
"github.com/google/uuid"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
)
type state int
// NetworkedApplication is the application that runs on top of the tailnet tunnel.
type NetworkedApplication interface {
// Closer is used to gracefully tear down the application prior to stopping the tunnel.
io.Closer
// Start the NetworkedApplication, using the provided AgentConn to connect.
Start(conn workspacesdk.AgentConn)
}
// WorkspaceStarter is used to create a start build of the workspace. It is an interface here because the CLI has lots
// of complex logic for determining the build parameters including prompting and environment variables, which we don't
// want to burden the Tunneler with. Other users of the Tunneler like `scaletest` can have a much simpler
// implementation.
type WorkspaceStarter interface {
StartWorkspace() error
}
const (
stateInit state = iota
exit
waitToStart
waitForWorkspaceStarted
waitForAgent
establishTailnet
tailnetUp
applicationUp
shutdownApplication
shutdownTailnet
maxState // used for testing
)
type Tunneler struct {
config Config
ctx context.Context
cancel context.CancelFunc
client *workspacesdk.Client
state state
agentConn workspacesdk.AgentConn
events chan tunnelerEvent
wg sync.WaitGroup
}
type Config struct {
// Required
WorkspaceID uuid.UUID
App NetworkedApplication
WorkspaceStarter WorkspaceStarter
// Optional:
// AgentName is the name of the agent to tunnel to. If blank, assumes workspace has only one agent and will cause
// an error if that is not the case.
AgentName string
// NoAutostart can be set to true to prevent the tunneler from automatically starting the workspace.
NoAutostart bool
// NoWaitForScripts can be set to true to cause the tunneler to dial as soon as the agent is up, not waiting for
// nominally blocking startup scripts.
NoWaitForScripts bool
// LogWriter is used to write progress logs (build, scripts, etc) if non-nil.
LogWriter io.Writer
// DebugLogger is used for logging internal messages and errors for debugging (e.g. in tests)
DebugLogger slog.Logger
}
// tunnelerEvent is an event relevant to setting up a tunnel. ONE of the fields is non-null per event to allow explicit
// ordering.
type tunnelerEvent struct {
shutdownSignal *shutdownSignal
buildUpdate *buildUpdate
provisionerJobLog *codersdk.ProvisionerJobLog
agentUpdate *agentUpdate
agentLog *codersdk.WorkspaceAgentLog
appUpdate *networkedApplicationUpdate
tailnetUpdate *tailnetUpdate
}
type shutdownSignal struct{}
type buildUpdate struct {
transition codersdk.WorkspaceTransition
jobStatus codersdk.ProvisionerJobStatus
}
type agentUpdate struct {
// TODO: commented out to appease linter
// transition codersdk.WorkspaceTransition
// id uuid.UUID
}
type networkedApplicationUpdate struct {
// up is true if the application is up. False if it is down.
up bool
}
type tailnetUpdate struct {
// up is true if the tailnet is up. False if it is down.
up bool
}
func NewTunneler(client *workspacesdk.Client, config Config) *Tunneler {
t := &Tunneler{
config: config,
client: client,
events: make(chan tunnelerEvent),
}
// this context ends when we successfully gracefully shut down or are forced closed.
t.ctx, t.cancel = context.WithCancel(context.Background())
t.wg.Add(2)
go t.start()
go t.eventLoop()
return t
}
func (t *Tunneler) start() {
defer t.wg.Done()
// here we would subscribe to updates.
// t.client.AgentConnectionWatch(t.config.WorkspaceID, t.config.AgentName)
}
func (t *Tunneler) eventLoop() {
defer t.wg.Done()
for t.state != exit {
var e tunnelerEvent
select {
case <-t.ctx.Done():
t.state = exit
return
case e = <-t.events:
}
switch {
case e.shutdownSignal != nil:
t.handleSignal()
case e.buildUpdate != nil:
t.handleBuildUpdate(e.buildUpdate)
case e.provisionerJobLog != nil:
t.handleProvisionerJobLog(e.provisionerJobLog)
case e.agentUpdate != nil:
t.handleAgentUpdate(e.agentUpdate)
case e.agentLog != nil:
t.handleAgentLog(e.agentLog)
case e.appUpdate != nil:
t.handleAppUpdate(e.appUpdate)
case e.tailnetUpdate != nil:
t.handleTailnetUpdate(e.tailnetUpdate)
}
}
}
func (t *Tunneler) handleSignal() {
switch t.state {
case exit, shutdownTailnet, shutdownApplication:
return
case tailnetUp, applicationUp:
t.wg.Add(1)
go t.closeApp()
t.state = shutdownApplication
case establishTailnet:
t.wg.Add(1)
go t.shutdownTailnet()
t.state = shutdownTailnet
case stateInit, waitToStart, waitForWorkspaceStarted, waitForAgent:
t.cancel() // stops the watch
t.state = exit
default:
t.config.DebugLogger.Critical(t.ctx, "missing case in handleSignal()", slog.F("state", t.state))
}
}
func (t *Tunneler) handleBuildUpdate(update *buildUpdate) {
if t.state == shutdownTailnet || t.state == shutdownApplication || t.state == exit {
return // no-op
}
var canMakeProgress, jobUnhealthy bool
switch update.jobStatus {
case codersdk.ProvisionerJobPending, codersdk.ProvisionerJobRunning:
canMakeProgress = true
case codersdk.ProvisionerJobSucceeded:
default:
jobUnhealthy = true
}
if update.transition == codersdk.WorkspaceTransitionDelete {
t.config.DebugLogger.Info(t.ctx, "workspace is being deleted", slog.F("job_status", update.jobStatus))
// treat same as signal
t.handleSignal()
return
}
if jobUnhealthy {
t.config.DebugLogger.Info(t.ctx, "build job is in unhealthy state", slog.F("job_status", update.jobStatus))
// treat same as signal
t.handleSignal()
return
}
if update.transition == codersdk.WorkspaceTransitionStart && canMakeProgress {
t.config.DebugLogger.Debug(t.ctx, "workspace is starting", slog.F("job_status", update.jobStatus))
switch t.state {
case establishTailnet:
// new build after we're already connecting
t.wg.Add(1)
go t.shutdownTailnet()
t.state = shutdownTailnet
case applicationUp, tailnetUp:
// new build after we have already connected
t.wg.Add(1)
go t.closeApp()
t.state = shutdownApplication
default:
t.state = waitForWorkspaceStarted
}
return
}
if update.transition == codersdk.WorkspaceTransitionStart && update.jobStatus == codersdk.ProvisionerJobSucceeded {
t.config.DebugLogger.Debug(t.ctx, "workspace is started", slog.F("job_status", update.jobStatus))
switch t.state {
case establishTailnet, applicationUp, tailnetUp:
// no-op. Later agent updates will tell us whether the tailnet connection is current.
default:
t.state = waitForAgent
}
return
}
if update.transition == codersdk.WorkspaceTransitionStop {
// these cases take effect regardless of whether the transition is complete or not
switch t.state {
case establishTailnet:
// new build after we're already connecting
t.wg.Add(1)
go t.shutdownTailnet()
t.state = shutdownTailnet
return
case applicationUp, tailnetUp:
// new build after we have already connected
t.wg.Add(1)
go t.closeApp()
t.state = shutdownApplication
return
}
if t.config.NoAutostart {
// we are stopped/stopping and configured not to automatically start. Nothing more to do.
t.cancel()
t.state = exit
return
}
if update.jobStatus == codersdk.ProvisionerJobSucceeded {
switch t.state {
case stateInit, waitToStart, waitForAgent:
t.wg.Add(1)
go t.startWorkspace()
t.state = waitForWorkspaceStarted
return
case waitForWorkspaceStarted:
return
default:
// unhittable because all the states where we have started already or are shutting down are handled
// earlier
t.config.DebugLogger.Critical(t.ctx, "unhandled build update while stopped", slog.F("state", t.state))
return
}
}
if canMakeProgress {
t.state = waitToStart
return
}
}
// unhittable
t.config.DebugLogger.Critical(t.ctx, "unhandled build update",
slog.F("job_status", update.jobStatus), slog.F("transition", update.transition), slog.F("state", t.state))
}
func (*Tunneler) handleProvisionerJobLog(*codersdk.ProvisionerJobLog) {
}
func (*Tunneler) handleAgentUpdate(*agentUpdate) {
}
func (*Tunneler) handleAgentLog(*codersdk.WorkspaceAgentLog) {
}
func (*Tunneler) handleAppUpdate(*networkedApplicationUpdate) {
}
func (*Tunneler) handleTailnetUpdate(*tailnetUpdate) {
}
func (t *Tunneler) closeApp() {
defer t.wg.Done()
err := t.config.App.Close()
if err != nil {
t.config.DebugLogger.Error(t.ctx, "failed to close networked application", slog.Error(err))
}
select {
case <-t.ctx.Done():
t.config.DebugLogger.Info(t.ctx, "context expired before sending app down")
case t.events <- tunnelerEvent{appUpdate: &networkedApplicationUpdate{up: false}}:
}
}
func (t *Tunneler) startWorkspace() {
defer t.wg.Done()
err := t.config.WorkspaceStarter.StartWorkspace()
if err != nil {
t.config.DebugLogger.Error(t.ctx, "failed to start workspace", slog.Error(err))
if t.config.LogWriter != nil {
_, _ = fmt.Fprintf(t.config.LogWriter, "failed to start workspace: %s", err.Error())
}
select {
case <-t.ctx.Done():
t.config.DebugLogger.Info(t.ctx, "context expired before sending signal after failed workspace start")
case t.events <- tunnelerEvent{shutdownSignal: &shutdownSignal{}}:
}
}
}
func (t *Tunneler) shutdownTailnet() {
defer t.wg.Done()
err := t.agentConn.Close()
if err != nil {
t.config.DebugLogger.Error(t.ctx, "failed to close agent connection", slog.Error(err))
}
select {
case <-t.ctx.Done():
t.config.DebugLogger.Debug(t.ctx, "context expired before sending event after shutting down tailnet")
case t.events <- tunnelerEvent{tailnetUpdate: &tailnetUpdate{up: false}}:
}
}
@@ -0,0 +1,261 @@
package tunneler
import (
"context"
"fmt"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/codersdk/workspacesdk/agentconnmock"
"github.com/coder/coder/v2/testutil"
)
// TestHandleBuildUpdate_Coverage ensures that we handle all possible initial states in combination with build updates.
func TestHandleBuildUpdate_Coverage(t *testing.T) {
t.Parallel()
workspaceID := uuid.UUID{1}
for s := range maxState {
for _, trans := range codersdk.WorkspaceTransitionEnums() {
for _, jobStatus := range codersdk.ProvisionerJobStatusEnums() {
for _, noAutostart := range []bool{true, false} {
for _, noWaitForScripts := range []bool{true, false} {
t.Run(fmt.Sprintf("%d_%s_%s_%t_%t", s, trans, jobStatus, noAutostart, noWaitForScripts), func(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
mAgentConn := agentconnmock.NewMockAgentConn(ctrl)
logger := testutil.Logger(t)
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
uut := &Tunneler{
config: Config{
WorkspaceID: workspaceID,
App: fakeApp{},
WorkspaceStarter: &fakeWorkspaceStarter{},
AgentName: "test",
NoAutostart: noAutostart,
NoWaitForScripts: noWaitForScripts,
DebugLogger: logger.Named("tunneler"),
},
events: make(chan tunnelerEvent),
ctx: ctx,
cancel: cancel,
state: s,
agentConn: mAgentConn,
}
mAgentConn.EXPECT().Close().Return(nil).AnyTimes()
uut.handleBuildUpdate(&buildUpdate{transition: trans, jobStatus: jobStatus})
done := make(chan struct{})
go func() {
defer close(done)
uut.wg.Wait()
}()
cancel() // cancel in case the update triggers a go routine that writes another event
// ensure we don't leak a go routine
_ = testutil.TryReceive(testCtx, t, done)
// We're not asserting the resulting state, as there are just too many to directly enumerate
// due to the combinations. Unhandled cases will hit a critical log in the handler and fail
// the test.
require.Less(t, uut.state, maxState)
require.GreaterOrEqual(t, uut.state, 0)
})
}
}
}
}
}
}
func TestBuildUpdatesStoppedWorkspace(t *testing.T) {
t.Parallel()
workspaceID := uuid.UUID{1}
logger := testutil.Logger(t)
fWorkspaceStarter := fakeWorkspaceStarter{}
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
uut := &Tunneler{
config: Config{
WorkspaceID: workspaceID,
App: fakeApp{},
WorkspaceStarter: &fWorkspaceStarter,
AgentName: "test",
DebugLogger: logger.Named("tunneler"),
},
events: make(chan tunnelerEvent),
ctx: ctx,
cancel: cancel,
state: stateInit,
}
uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStop, jobStatus: codersdk.ProvisionerJobPending})
require.Equal(t, waitToStart, uut.state)
waitForGoroutines(testCtx, t, uut)
require.False(t, fWorkspaceStarter.started)
uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStop, jobStatus: codersdk.ProvisionerJobRunning})
require.Equal(t, waitToStart, uut.state)
waitForGoroutines(testCtx, t, uut)
require.False(t, fWorkspaceStarter.started)
// when stop job succeeds, we start the workspace
uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStop, jobStatus: codersdk.ProvisionerJobSucceeded})
require.Equal(t, waitForWorkspaceStarted, uut.state)
waitForGoroutines(testCtx, t, uut)
require.True(t, fWorkspaceStarter.started)
uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStart, jobStatus: codersdk.ProvisionerJobPending})
require.Equal(t, waitForWorkspaceStarted, uut.state)
waitForGoroutines(testCtx, t, uut)
uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStart, jobStatus: codersdk.ProvisionerJobRunning})
require.Equal(t, waitForWorkspaceStarted, uut.state)
waitForGoroutines(testCtx, t, uut)
uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStart, jobStatus: codersdk.ProvisionerJobSucceeded})
require.Equal(t, waitForAgent, uut.state)
waitForGoroutines(testCtx, t, uut)
}
func TestBuildUpdatesNewBuildWhileWaiting(t *testing.T) {
t.Parallel()
workspaceID := uuid.UUID{1}
logger := testutil.Logger(t)
fWorkspaceStarter := fakeWorkspaceStarter{}
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
uut := &Tunneler{
config: Config{
WorkspaceID: workspaceID,
App: fakeApp{},
WorkspaceStarter: &fWorkspaceStarter,
AgentName: "test",
DebugLogger: logger.Named("tunneler"),
},
events: make(chan tunnelerEvent),
ctx: ctx,
cancel: cancel,
state: waitForAgent,
}
// New build comes in while we are waiting for the agent to start. We roll back to waiting for the workspace to start.
uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStart, jobStatus: codersdk.ProvisionerJobRunning})
require.Equal(t, waitForWorkspaceStarted, uut.state)
waitForGoroutines(testCtx, t, uut)
require.False(t, fWorkspaceStarter.started)
}
func TestBuildUpdatesBadJobs(t *testing.T) {
t.Parallel()
for _, jobStatus := range []codersdk.ProvisionerJobStatus{
codersdk.ProvisionerJobFailed,
codersdk.ProvisionerJobCanceling,
codersdk.ProvisionerJobCanceled,
codersdk.ProvisionerJobUnknown,
} {
t.Run(string(jobStatus), func(t *testing.T) {
t.Parallel()
workspaceID := uuid.UUID{1}
logger := testutil.Logger(t)
fWorkspaceStarter := fakeWorkspaceStarter{}
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
uut := &Tunneler{
config: Config{
WorkspaceID: workspaceID,
App: fakeApp{},
WorkspaceStarter: &fWorkspaceStarter,
AgentName: "test",
DebugLogger: logger.Named("tunneler"),
},
events: make(chan tunnelerEvent),
ctx: ctx,
cancel: cancel,
state: stateInit,
}
uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStart, jobStatus: codersdk.ProvisionerJobRunning})
require.Equal(t, waitForWorkspaceStarted, uut.state)
waitForGoroutines(testCtx, t, uut)
require.False(t, fWorkspaceStarter.started)
uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStop, jobStatus: jobStatus})
require.Equal(t, exit, uut.state)
waitForGoroutines(testCtx, t, uut)
require.False(t, fWorkspaceStarter.started)
// should cancel
require.Error(t, ctx.Err())
})
}
}
func TestBuildUpdatesNoAutostart(t *testing.T) {
t.Parallel()
workspaceID := uuid.UUID{1}
logger := testutil.Logger(t)
fWorkspaceStarter := fakeWorkspaceStarter{}
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
uut := &Tunneler{
config: Config{
WorkspaceID: workspaceID,
App: fakeApp{},
WorkspaceStarter: &fWorkspaceStarter,
AgentName: "test",
NoAutostart: true,
DebugLogger: logger.Named("tunneler"),
},
events: make(chan tunnelerEvent),
ctx: ctx,
cancel: cancel,
state: stateInit,
}
// when stop job succeeds, we exit because autostart is disabled
uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStop, jobStatus: codersdk.ProvisionerJobSucceeded})
require.Equal(t, exit, uut.state)
waitForGoroutines(testCtx, t, uut)
require.False(t, fWorkspaceStarter.started)
// should cancel
require.Error(t, ctx.Err())
}
func waitForGoroutines(ctx context.Context, t *testing.T, tunneler *Tunneler) {
done := make(chan struct{})
go func() {
defer close(done)
tunneler.wg.Wait()
}()
_ = testutil.TryReceive(ctx, t, done)
}
type fakeWorkspaceStarter struct {
started bool
}
func (f *fakeWorkspaceStarter) StartWorkspace() error {
f.started = true
return nil
}
type fakeApp struct{}
func (fakeApp) Close() error {
return nil
}
func (fakeApp) Start(workspacesdk.AgentConn) {}