From 9b4d15db9bd253d03794d9e2994ce84eac7f32fd Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Fri, 27 Mar 2026 08:52:13 -0400 Subject: [PATCH] chore: add Tunneler FSM and partial impl (#23691) Adds the Tunneler state machine and logic for handling build updates. This is a partial implementation and tests. Further PRs will fill out the other event types. Relates to GRU-18 --- codersdk/workspacebuilds.go | 4 + codersdk/workspacesdk/tunneler/tunneler.go | 344 ++++++++++++++++++ .../tunneler/tunneler_internal_test.go | 261 +++++++++++++ 3 files changed, 609 insertions(+) create mode 100644 codersdk/workspacesdk/tunneler/tunneler.go create mode 100644 codersdk/workspacesdk/tunneler/tunneler_internal_test.go diff --git a/codersdk/workspacebuilds.go b/codersdk/workspacebuilds.go index 6206539da0..518088575e 100644 --- a/codersdk/workspacebuilds.go +++ b/codersdk/workspacebuilds.go @@ -19,6 +19,10 @@ const ( WorkspaceTransitionDelete WorkspaceTransition = "delete" ) +func WorkspaceTransitionEnums() []WorkspaceTransition { + return []WorkspaceTransition{WorkspaceTransitionStart, WorkspaceTransitionStop, WorkspaceTransitionDelete} +} + type WorkspaceStatus string const ( diff --git a/codersdk/workspacesdk/tunneler/tunneler.go b/codersdk/workspacesdk/tunneler/tunneler.go new file mode 100644 index 0000000000..28e1f7bb94 --- /dev/null +++ b/codersdk/workspacesdk/tunneler/tunneler.go @@ -0,0 +1,344 @@ +package tunneler + +import ( + "context" + "fmt" + "io" + "sync" + + "github.com/google/uuid" + + "cdr.dev/slog/v3" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/workspacesdk" +) + +type state int + +// NetworkedApplication is the application that runs on top of the tailnet tunnel. +type NetworkedApplication interface { + // Closer is used to gracefully tear down the application prior to stopping the tunnel. + io.Closer + // Start the NetworkedApplication, using the provided AgentConn to connect. + Start(conn workspacesdk.AgentConn) +} + +// WorkspaceStarter is used to create a start build of the workspace. It is an interface here because the CLI has lots +// of complex logic for determining the build parameters including prompting and environment variables, which we don't +// want to burden the Tunneler with. Other users of the Tunneler like `scaletest` can have a much simpler +// implementation. +type WorkspaceStarter interface { + StartWorkspace() error +} + +const ( + stateInit state = iota + exit + waitToStart + waitForWorkspaceStarted + waitForAgent + establishTailnet + tailnetUp + applicationUp + shutdownApplication + shutdownTailnet + maxState // used for testing +) + +type Tunneler struct { + config Config + ctx context.Context + cancel context.CancelFunc + client *workspacesdk.Client + state state + agentConn workspacesdk.AgentConn + events chan tunnelerEvent + wg sync.WaitGroup +} + +type Config struct { + // Required + WorkspaceID uuid.UUID + App NetworkedApplication + WorkspaceStarter WorkspaceStarter + + // Optional: + + // AgentName is the name of the agent to tunnel to. If blank, assumes workspace has only one agent and will cause + // an error if that is not the case. + AgentName string + // NoAutostart can be set to true to prevent the tunneler from automatically starting the workspace. + NoAutostart bool + // NoWaitForScripts can be set to true to cause the tunneler to dial as soon as the agent is up, not waiting for + // nominally blocking startup scripts. + NoWaitForScripts bool + // LogWriter is used to write progress logs (build, scripts, etc) if non-nil. + LogWriter io.Writer + // DebugLogger is used for logging internal messages and errors for debugging (e.g. in tests) + DebugLogger slog.Logger +} + +// tunnelerEvent is an event relevant to setting up a tunnel. ONE of the fields is non-null per event to allow explicit +// ordering. +type tunnelerEvent struct { + shutdownSignal *shutdownSignal + buildUpdate *buildUpdate + provisionerJobLog *codersdk.ProvisionerJobLog + agentUpdate *agentUpdate + agentLog *codersdk.WorkspaceAgentLog + appUpdate *networkedApplicationUpdate + tailnetUpdate *tailnetUpdate +} + +type shutdownSignal struct{} + +type buildUpdate struct { + transition codersdk.WorkspaceTransition + jobStatus codersdk.ProvisionerJobStatus +} + +type agentUpdate struct { + // TODO: commented out to appease linter + // transition codersdk.WorkspaceTransition + // id uuid.UUID +} + +type networkedApplicationUpdate struct { + // up is true if the application is up. False if it is down. + up bool +} + +type tailnetUpdate struct { + // up is true if the tailnet is up. False if it is down. + up bool +} + +func NewTunneler(client *workspacesdk.Client, config Config) *Tunneler { + t := &Tunneler{ + config: config, + client: client, + events: make(chan tunnelerEvent), + } + // this context ends when we successfully gracefully shut down or are forced closed. + t.ctx, t.cancel = context.WithCancel(context.Background()) + t.wg.Add(2) + go t.start() + go t.eventLoop() + return t +} + +func (t *Tunneler) start() { + defer t.wg.Done() + // here we would subscribe to updates. + // t.client.AgentConnectionWatch(t.config.WorkspaceID, t.config.AgentName) +} + +func (t *Tunneler) eventLoop() { + defer t.wg.Done() + for t.state != exit { + var e tunnelerEvent + select { + case <-t.ctx.Done(): + t.state = exit + return + case e = <-t.events: + } + switch { + case e.shutdownSignal != nil: + t.handleSignal() + case e.buildUpdate != nil: + t.handleBuildUpdate(e.buildUpdate) + case e.provisionerJobLog != nil: + t.handleProvisionerJobLog(e.provisionerJobLog) + case e.agentUpdate != nil: + t.handleAgentUpdate(e.agentUpdate) + case e.agentLog != nil: + t.handleAgentLog(e.agentLog) + case e.appUpdate != nil: + t.handleAppUpdate(e.appUpdate) + case e.tailnetUpdate != nil: + t.handleTailnetUpdate(e.tailnetUpdate) + } + } +} + +func (t *Tunneler) handleSignal() { + switch t.state { + case exit, shutdownTailnet, shutdownApplication: + return + case tailnetUp, applicationUp: + t.wg.Add(1) + go t.closeApp() + t.state = shutdownApplication + case establishTailnet: + t.wg.Add(1) + go t.shutdownTailnet() + t.state = shutdownTailnet + case stateInit, waitToStart, waitForWorkspaceStarted, waitForAgent: + t.cancel() // stops the watch + t.state = exit + default: + t.config.DebugLogger.Critical(t.ctx, "missing case in handleSignal()", slog.F("state", t.state)) + } +} + +func (t *Tunneler) handleBuildUpdate(update *buildUpdate) { + if t.state == shutdownTailnet || t.state == shutdownApplication || t.state == exit { + return // no-op + } + + var canMakeProgress, jobUnhealthy bool + switch update.jobStatus { + case codersdk.ProvisionerJobPending, codersdk.ProvisionerJobRunning: + canMakeProgress = true + case codersdk.ProvisionerJobSucceeded: + default: + jobUnhealthy = true + } + + if update.transition == codersdk.WorkspaceTransitionDelete { + t.config.DebugLogger.Info(t.ctx, "workspace is being deleted", slog.F("job_status", update.jobStatus)) + // treat same as signal + t.handleSignal() + return + } + if jobUnhealthy { + t.config.DebugLogger.Info(t.ctx, "build job is in unhealthy state", slog.F("job_status", update.jobStatus)) + // treat same as signal + t.handleSignal() + return + } + + if update.transition == codersdk.WorkspaceTransitionStart && canMakeProgress { + t.config.DebugLogger.Debug(t.ctx, "workspace is starting", slog.F("job_status", update.jobStatus)) + switch t.state { + case establishTailnet: + // new build after we're already connecting + t.wg.Add(1) + go t.shutdownTailnet() + t.state = shutdownTailnet + case applicationUp, tailnetUp: + // new build after we have already connected + t.wg.Add(1) + go t.closeApp() + t.state = shutdownApplication + default: + t.state = waitForWorkspaceStarted + } + return + } + if update.transition == codersdk.WorkspaceTransitionStart && update.jobStatus == codersdk.ProvisionerJobSucceeded { + t.config.DebugLogger.Debug(t.ctx, "workspace is started", slog.F("job_status", update.jobStatus)) + switch t.state { + case establishTailnet, applicationUp, tailnetUp: + // no-op. Later agent updates will tell us whether the tailnet connection is current. + default: + t.state = waitForAgent + } + return + } + + if update.transition == codersdk.WorkspaceTransitionStop { + // these cases take effect regardless of whether the transition is complete or not + switch t.state { + case establishTailnet: + // new build after we're already connecting + t.wg.Add(1) + go t.shutdownTailnet() + t.state = shutdownTailnet + return + case applicationUp, tailnetUp: + // new build after we have already connected + t.wg.Add(1) + go t.closeApp() + t.state = shutdownApplication + return + } + if t.config.NoAutostart { + // we are stopped/stopping and configured not to automatically start. Nothing more to do. + t.cancel() + t.state = exit + return + } + if update.jobStatus == codersdk.ProvisionerJobSucceeded { + switch t.state { + case stateInit, waitToStart, waitForAgent: + t.wg.Add(1) + go t.startWorkspace() + t.state = waitForWorkspaceStarted + return + case waitForWorkspaceStarted: + return + default: + // unhittable because all the states where we have started already or are shutting down are handled + // earlier + t.config.DebugLogger.Critical(t.ctx, "unhandled build update while stopped", slog.F("state", t.state)) + return + } + } + if canMakeProgress { + t.state = waitToStart + return + } + } + // unhittable + t.config.DebugLogger.Critical(t.ctx, "unhandled build update", + slog.F("job_status", update.jobStatus), slog.F("transition", update.transition), slog.F("state", t.state)) +} + +func (*Tunneler) handleProvisionerJobLog(*codersdk.ProvisionerJobLog) { +} + +func (*Tunneler) handleAgentUpdate(*agentUpdate) { +} + +func (*Tunneler) handleAgentLog(*codersdk.WorkspaceAgentLog) { +} + +func (*Tunneler) handleAppUpdate(*networkedApplicationUpdate) { +} + +func (*Tunneler) handleTailnetUpdate(*tailnetUpdate) { +} + +func (t *Tunneler) closeApp() { + defer t.wg.Done() + err := t.config.App.Close() + if err != nil { + t.config.DebugLogger.Error(t.ctx, "failed to close networked application", slog.Error(err)) + } + select { + case <-t.ctx.Done(): + t.config.DebugLogger.Info(t.ctx, "context expired before sending app down") + case t.events <- tunnelerEvent{appUpdate: &networkedApplicationUpdate{up: false}}: + } +} + +func (t *Tunneler) startWorkspace() { + defer t.wg.Done() + err := t.config.WorkspaceStarter.StartWorkspace() + if err != nil { + t.config.DebugLogger.Error(t.ctx, "failed to start workspace", slog.Error(err)) + if t.config.LogWriter != nil { + _, _ = fmt.Fprintf(t.config.LogWriter, "failed to start workspace: %s", err.Error()) + } + select { + case <-t.ctx.Done(): + t.config.DebugLogger.Info(t.ctx, "context expired before sending signal after failed workspace start") + case t.events <- tunnelerEvent{shutdownSignal: &shutdownSignal{}}: + } + } +} + +func (t *Tunneler) shutdownTailnet() { + defer t.wg.Done() + err := t.agentConn.Close() + if err != nil { + t.config.DebugLogger.Error(t.ctx, "failed to close agent connection", slog.Error(err)) + } + select { + case <-t.ctx.Done(): + t.config.DebugLogger.Debug(t.ctx, "context expired before sending event after shutting down tailnet") + case t.events <- tunnelerEvent{tailnetUpdate: &tailnetUpdate{up: false}}: + } +} diff --git a/codersdk/workspacesdk/tunneler/tunneler_internal_test.go b/codersdk/workspacesdk/tunneler/tunneler_internal_test.go new file mode 100644 index 0000000000..75e88be2e9 --- /dev/null +++ b/codersdk/workspacesdk/tunneler/tunneler_internal_test.go @@ -0,0 +1,261 @@ +package tunneler + +import ( + "context" + "fmt" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/workspacesdk" + "github.com/coder/coder/v2/codersdk/workspacesdk/agentconnmock" + "github.com/coder/coder/v2/testutil" +) + +// TestHandleBuildUpdate_Coverage ensures that we handle all possible initial states in combination with build updates. +func TestHandleBuildUpdate_Coverage(t *testing.T) { + t.Parallel() + workspaceID := uuid.UUID{1} + + for s := range maxState { + for _, trans := range codersdk.WorkspaceTransitionEnums() { + for _, jobStatus := range codersdk.ProvisionerJobStatusEnums() { + for _, noAutostart := range []bool{true, false} { + for _, noWaitForScripts := range []bool{true, false} { + t.Run(fmt.Sprintf("%d_%s_%s_%t_%t", s, trans, jobStatus, noAutostart, noWaitForScripts), func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + mAgentConn := agentconnmock.NewMockAgentConn(ctrl) + logger := testutil.Logger(t) + + testCtx := testutil.Context(t, testutil.WaitShort) + ctx, cancel := context.WithCancel(testCtx) + uut := &Tunneler{ + config: Config{ + WorkspaceID: workspaceID, + App: fakeApp{}, + WorkspaceStarter: &fakeWorkspaceStarter{}, + AgentName: "test", + NoAutostart: noAutostart, + NoWaitForScripts: noWaitForScripts, + DebugLogger: logger.Named("tunneler"), + }, + events: make(chan tunnelerEvent), + ctx: ctx, + cancel: cancel, + state: s, + agentConn: mAgentConn, + } + + mAgentConn.EXPECT().Close().Return(nil).AnyTimes() + + uut.handleBuildUpdate(&buildUpdate{transition: trans, jobStatus: jobStatus}) + done := make(chan struct{}) + go func() { + defer close(done) + uut.wg.Wait() + }() + cancel() // cancel in case the update triggers a go routine that writes another event + // ensure we don't leak a go routine + _ = testutil.TryReceive(testCtx, t, done) + + // We're not asserting the resulting state, as there are just too many to directly enumerate + // due to the combinations. Unhandled cases will hit a critical log in the handler and fail + // the test. + require.Less(t, uut.state, maxState) + require.GreaterOrEqual(t, uut.state, 0) + }) + } + } + } + } + } +} + +func TestBuildUpdatesStoppedWorkspace(t *testing.T) { + t.Parallel() + workspaceID := uuid.UUID{1} + logger := testutil.Logger(t) + fWorkspaceStarter := fakeWorkspaceStarter{} + + testCtx := testutil.Context(t, testutil.WaitShort) + ctx, cancel := context.WithCancel(testCtx) + uut := &Tunneler{ + config: Config{ + WorkspaceID: workspaceID, + App: fakeApp{}, + WorkspaceStarter: &fWorkspaceStarter, + AgentName: "test", + DebugLogger: logger.Named("tunneler"), + }, + events: make(chan tunnelerEvent), + ctx: ctx, + cancel: cancel, + state: stateInit, + } + + uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStop, jobStatus: codersdk.ProvisionerJobPending}) + require.Equal(t, waitToStart, uut.state) + waitForGoroutines(testCtx, t, uut) + require.False(t, fWorkspaceStarter.started) + + uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStop, jobStatus: codersdk.ProvisionerJobRunning}) + require.Equal(t, waitToStart, uut.state) + waitForGoroutines(testCtx, t, uut) + require.False(t, fWorkspaceStarter.started) + + // when stop job succeeds, we start the workspace + uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStop, jobStatus: codersdk.ProvisionerJobSucceeded}) + require.Equal(t, waitForWorkspaceStarted, uut.state) + waitForGoroutines(testCtx, t, uut) + require.True(t, fWorkspaceStarter.started) + + uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStart, jobStatus: codersdk.ProvisionerJobPending}) + require.Equal(t, waitForWorkspaceStarted, uut.state) + waitForGoroutines(testCtx, t, uut) + + uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStart, jobStatus: codersdk.ProvisionerJobRunning}) + require.Equal(t, waitForWorkspaceStarted, uut.state) + waitForGoroutines(testCtx, t, uut) + + uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStart, jobStatus: codersdk.ProvisionerJobSucceeded}) + require.Equal(t, waitForAgent, uut.state) + waitForGoroutines(testCtx, t, uut) +} + +func TestBuildUpdatesNewBuildWhileWaiting(t *testing.T) { + t.Parallel() + workspaceID := uuid.UUID{1} + logger := testutil.Logger(t) + fWorkspaceStarter := fakeWorkspaceStarter{} + + testCtx := testutil.Context(t, testutil.WaitShort) + ctx, cancel := context.WithCancel(testCtx) + uut := &Tunneler{ + config: Config{ + WorkspaceID: workspaceID, + App: fakeApp{}, + WorkspaceStarter: &fWorkspaceStarter, + AgentName: "test", + DebugLogger: logger.Named("tunneler"), + }, + events: make(chan tunnelerEvent), + ctx: ctx, + cancel: cancel, + state: waitForAgent, + } + + // New build comes in while we are waiting for the agent to start. We roll back to waiting for the workspace to start. + uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStart, jobStatus: codersdk.ProvisionerJobRunning}) + require.Equal(t, waitForWorkspaceStarted, uut.state) + waitForGoroutines(testCtx, t, uut) + require.False(t, fWorkspaceStarter.started) +} + +func TestBuildUpdatesBadJobs(t *testing.T) { + t.Parallel() + for _, jobStatus := range []codersdk.ProvisionerJobStatus{ + codersdk.ProvisionerJobFailed, + codersdk.ProvisionerJobCanceling, + codersdk.ProvisionerJobCanceled, + codersdk.ProvisionerJobUnknown, + } { + t.Run(string(jobStatus), func(t *testing.T) { + t.Parallel() + workspaceID := uuid.UUID{1} + logger := testutil.Logger(t) + fWorkspaceStarter := fakeWorkspaceStarter{} + + testCtx := testutil.Context(t, testutil.WaitShort) + ctx, cancel := context.WithCancel(testCtx) + uut := &Tunneler{ + config: Config{ + WorkspaceID: workspaceID, + App: fakeApp{}, + WorkspaceStarter: &fWorkspaceStarter, + AgentName: "test", + DebugLogger: logger.Named("tunneler"), + }, + events: make(chan tunnelerEvent), + ctx: ctx, + cancel: cancel, + state: stateInit, + } + + uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStart, jobStatus: codersdk.ProvisionerJobRunning}) + require.Equal(t, waitForWorkspaceStarted, uut.state) + waitForGoroutines(testCtx, t, uut) + require.False(t, fWorkspaceStarter.started) + + uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStop, jobStatus: jobStatus}) + require.Equal(t, exit, uut.state) + waitForGoroutines(testCtx, t, uut) + require.False(t, fWorkspaceStarter.started) + + // should cancel + require.Error(t, ctx.Err()) + }) + } +} + +func TestBuildUpdatesNoAutostart(t *testing.T) { + t.Parallel() + workspaceID := uuid.UUID{1} + logger := testutil.Logger(t) + fWorkspaceStarter := fakeWorkspaceStarter{} + + testCtx := testutil.Context(t, testutil.WaitShort) + ctx, cancel := context.WithCancel(testCtx) + uut := &Tunneler{ + config: Config{ + WorkspaceID: workspaceID, + App: fakeApp{}, + WorkspaceStarter: &fWorkspaceStarter, + AgentName: "test", + NoAutostart: true, + DebugLogger: logger.Named("tunneler"), + }, + events: make(chan tunnelerEvent), + ctx: ctx, + cancel: cancel, + state: stateInit, + } + + // when stop job succeeds, we exit because autostart is disabled + uut.handleBuildUpdate(&buildUpdate{transition: codersdk.WorkspaceTransitionStop, jobStatus: codersdk.ProvisionerJobSucceeded}) + require.Equal(t, exit, uut.state) + waitForGoroutines(testCtx, t, uut) + require.False(t, fWorkspaceStarter.started) + + // should cancel + require.Error(t, ctx.Err()) +} + +func waitForGoroutines(ctx context.Context, t *testing.T, tunneler *Tunneler) { + done := make(chan struct{}) + go func() { + defer close(done) + tunneler.wg.Wait() + }() + _ = testutil.TryReceive(ctx, t, done) +} + +type fakeWorkspaceStarter struct { + started bool +} + +func (f *fakeWorkspaceStarter) StartWorkspace() error { + f.started = true + return nil +} + +type fakeApp struct{} + +func (fakeApp) Close() error { + return nil +} + +func (fakeApp) Start(workspacesdk.AgentConn) {}