diff --git a/coderd/jobreaper/detector_test.go b/coderd/jobreaper/detector_test.go index 1f0df05e4f..ff5b221be8 100644 --- a/coderd/jobreaper/detector_test.go +++ b/coderd/jobreaper/detector_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -20,6 +21,7 @@ import ( "github.com/coder/coder/v2/coderd/database/dbfake" "github.com/coder/coder/v2/coderd/database/dbgen" "github.com/coder/coder/v2/coderd/database/dbtestutil" + "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/jobreaper" "github.com/coder/coder/v2/coderd/provisionerdserver" "github.com/coder/coder/v2/coderd/rbac" @@ -31,48 +33,101 @@ func TestMain(m *testing.M) { goleak.VerifyTestMain(m, testutil.GoleakOptions...) } +// detectorTestEnv provides common infrastructure for jobreaper detector tests, +// reducing the repeated setup/teardown boilerplate across every test function. +type detectorTestEnv struct { + t *testing.T + DB database.Store + Pubsub pubsub.Pubsub + detector *jobreaper.Detector + tickCh chan time.Time + statsCh chan jobreaper.Stats +} + +// newDetectorTestEnv creates a new test environment with a started detector. +func newDetectorTestEnv(ctx context.Context, t *testing.T) *detectorTestEnv { + t.Helper() + db, ps := dbtestutil.NewDB(t) + log := testutil.Logger(t) + tickCh := make(chan time.Time) + statsCh := make(chan jobreaper.Stats) + + detector := jobreaper.New(ctx, wrapDBAuthz(db, log), ps, log, tickCh).WithStatsChannel(statsCh) + detector.Start() + + return &detectorTestEnv{ + t: t, + DB: db, + Pubsub: ps, + detector: detector, + tickCh: tickCh, + statsCh: statsCh, + } +} + +// tick sends a tick with the given time and returns the stats from the +// detector run. It respects context cancellation to avoid blocking forever +// if the detector exits unexpectedly. +// +// tick must not be called from a separate goroutine, as it calls +// require.FailNow which uses runtime.Goexit under the hood. +func (e *detectorTestEnv) tick(ctx context.Context, now time.Time) jobreaper.Stats { + e.t.Helper() + testutil.RequireSend(ctx, e.t, e.tickCh, now) + return testutil.RequireReceive(ctx, e.t, e.statsCh) +} + +// close stops the detector and waits for it to finish. +func (e *detectorTestEnv) close() { + e.detector.Close() + e.detector.Wait() +} + +// requireTerminatedJob asserts that a provisioner job was properly terminated +// by the job reaper with the expected reap type (hung or pending). +func requireTerminatedJob(ctx context.Context, t *testing.T, db database.Store, jobID uuid.UUID, now time.Time, reapType jobreaper.ReapType) { + t.Helper() + job, err := db.GetProvisionerJobByID(ctx, jobID) + require.NoError(t, err) + require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second) + require.True(t, job.CompletedAt.Valid) + require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second) + if reapType == jobreaper.Pending { + require.True(t, job.StartedAt.Valid) + require.WithinDuration(t, now, job.StartedAt.Time, 30*time.Second) + } + require.True(t, job.Error.Valid) + require.Contains(t, job.Error.String, fmt.Sprintf("Build has been detected as %s", reapType)) + require.False(t, job.ErrorCode.Valid) +} + func TestDetectorNoJobs(t *testing.T) { t.Parallel() - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, pubsub = dbtestutil.NewDB(t) - log = testutil.Logger(t) - tickCh = make(chan time.Time) - statsCh = make(chan jobreaper.Stats) - ) + ctx := testutil.Context(t, testutil.WaitLong) + env := newDetectorTestEnv(ctx, t) + defer env.close() - detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh) - detector.Start() - tickCh <- time.Now() - - stats := <-statsCh + stats := env.tick(ctx, time.Now()) require.NoError(t, stats.Error) require.Empty(t, stats.TerminatedJobIDs) - - detector.Close() - detector.Wait() } func TestDetectorNoHungJobs(t *testing.T) { t.Parallel() - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, pubsub = dbtestutil.NewDB(t) - log = testutil.Logger(t) - tickCh = make(chan time.Time) - statsCh = make(chan jobreaper.Stats) - ) + ctx := testutil.Context(t, testutil.WaitLong) + env := newDetectorTestEnv(ctx, t) + defer env.close() // Insert some jobs that are running and haven't been updated in a while, // but not enough to be considered hung. now := time.Now() - org := dbgen.Organization(t, db, database.Organization{}) - user := dbgen.User(t, db, database.User{}) - file := dbgen.File(t, db, database.File{}) + org := dbgen.Organization(t, env.DB, database.Organization{}) + user := dbgen.User(t, env.DB, database.User{}) + file := dbgen.File(t, env.DB, database.File{}) for i := 0; i < 5; i++ { - dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{ + dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{ CreatedAt: now.Add(-time.Minute * 5), UpdatedAt: now.Add(-time.Minute * time.Duration(i)), StartedAt: sql.NullTime{ @@ -89,51 +144,40 @@ func TestDetectorNoHungJobs(t *testing.T) { }) } - detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh) - detector.Start() - tickCh <- now - - stats := <-statsCh + stats := env.tick(ctx, now) require.NoError(t, stats.Error) require.Empty(t, stats.TerminatedJobIDs) - - detector.Close() - detector.Wait() } func TestDetectorHungWorkspaceBuild(t *testing.T) { t.Parallel() - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, pubsub = dbtestutil.NewDB(t) - log = testutil.Logger(t) - tickCh = make(chan time.Time) - statsCh = make(chan jobreaper.Stats) - ) + ctx := testutil.Context(t, testutil.WaitLong) + env := newDetectorTestEnv(ctx, t) + defer env.close() var ( now = time.Now() twentyMinAgo = now.Add(-time.Minute * 20) tenMinAgo = now.Add(-time.Minute * 10) sixMinAgo = now.Add(-time.Minute * 6) - org = dbgen.Organization(t, db, database.Organization{}) - user = dbgen.User(t, db, database.User{}) + org = dbgen.Organization(t, env.DB, database.Organization{}) + user = dbgen.User(t, env.DB, database.User{}) expectedWorkspaceBuildState = []byte(`{"dean":"cool","colin":"also cool"}`) ) // Previous build (completed successfully). - previousBuild := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ + previousBuild := dbfake.WorkspaceBuild(t, env.DB, database.WorkspaceTable{ OrganizationID: org.ID, OwnerID: user.ID, - }).Pubsub(pubsub).Seed(database.WorkspaceBuild{}). + }).Pubsub(env.Pubsub).Seed(database.WorkspaceBuild{}). ProvisionerState(expectedWorkspaceBuildState). Succeeded(dbfake.WithJobCompletedAt(twentyMinAgo)). Do() // Current build (hung - running job with UpdatedAt > 5 min ago). - currentBuild := dbfake.WorkspaceBuild(t, db, previousBuild.Workspace). - Pubsub(pubsub). + currentBuild := dbfake.WorkspaceBuild(t, env.DB, previousBuild.Workspace). + Pubsub(env.Pubsub). Seed(database.WorkspaceBuild{BuildNumber: 2}). Starting(dbfake.WithJobStartedAt(tenMinAgo), dbfake.WithJobUpdatedAt(sixMinAgo)). Do() @@ -141,70 +185,52 @@ func TestDetectorHungWorkspaceBuild(t *testing.T) { t.Log("previous job ID: ", previousBuild.Build.JobID) t.Log("current job ID: ", currentBuild.Build.JobID) - detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh) - detector.Start() - tickCh <- now - - stats := <-statsCh + stats := env.tick(ctx, now) require.NoError(t, stats.Error) require.Len(t, stats.TerminatedJobIDs, 1) require.Equal(t, currentBuild.Build.JobID, stats.TerminatedJobIDs[0]) // Check that the current provisioner job was updated. - job, err := db.GetProvisionerJobByID(ctx, currentBuild.Build.JobID) - require.NoError(t, err) - require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second) - require.True(t, job.CompletedAt.Valid) - require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second) - require.True(t, job.Error.Valid) - require.Contains(t, job.Error.String, "Build has been detected as hung") - require.False(t, job.ErrorCode.Valid) + requireTerminatedJob(ctx, t, env.DB, currentBuild.Build.JobID, now, jobreaper.Hung) // Check that the provisioner state was copied. - build, err := db.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID) + build, err := env.DB.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID) require.NoError(t, err) - provisionerStateRow, err := db.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID) + provisionerStateRow, err := env.DB.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID) require.NoError(t, err) require.Equal(t, expectedWorkspaceBuildState, provisionerStateRow.ProvisionerState) - - detector.Close() - detector.Wait() } func TestDetectorHungWorkspaceBuildNoOverrideState(t *testing.T) { t.Parallel() - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, pubsub = dbtestutil.NewDB(t) - log = testutil.Logger(t) - tickCh = make(chan time.Time) - statsCh = make(chan jobreaper.Stats) - ) + ctx := testutil.Context(t, testutil.WaitLong) + env := newDetectorTestEnv(ctx, t) + defer env.close() var ( now = time.Now() twentyMinAgo = now.Add(-time.Minute * 20) tenMinAgo = now.Add(-time.Minute * 10) sixMinAgo = now.Add(-time.Minute * 6) - org = dbgen.Organization(t, db, database.Organization{}) - user = dbgen.User(t, db, database.User{}) + org = dbgen.Organization(t, env.DB, database.Organization{}) + user = dbgen.User(t, env.DB, database.User{}) expectedWorkspaceBuildState = []byte(`{"dean":"cool","colin":"also cool"}`) ) // Previous build (completed successfully). - previousBuild := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ + previousBuild := dbfake.WorkspaceBuild(t, env.DB, database.WorkspaceTable{ OrganizationID: org.ID, OwnerID: user.ID, - }).Pubsub(pubsub).Seed(database.WorkspaceBuild{}). + }).Pubsub(env.Pubsub).Seed(database.WorkspaceBuild{}). ProvisionerState([]byte(`{"dean":"NOT cool","colin":"also NOT cool"}`)). Succeeded(dbfake.WithJobCompletedAt(twentyMinAgo)). Do() // Current build (hung - running job with UpdatedAt > 5 min ago). // This build already has provisioner state, which should NOT be overridden. - currentBuild := dbfake.WorkspaceBuild(t, db, previousBuild.Workspace). - Pubsub(pubsub). + currentBuild := dbfake.WorkspaceBuild(t, env.DB, previousBuild.Workspace). + Pubsub(env.Pubsub). Seed(database.WorkspaceBuild{ BuildNumber: 2, }).ProvisionerState(expectedWorkspaceBuildState). @@ -214,159 +240,107 @@ func TestDetectorHungWorkspaceBuildNoOverrideState(t *testing.T) { t.Log("previous job ID: ", previousBuild.Build.JobID) t.Log("current job ID: ", currentBuild.Build.JobID) - detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh) - detector.Start() - tickCh <- now - - stats := <-statsCh + stats := env.tick(ctx, now) require.NoError(t, stats.Error) require.Len(t, stats.TerminatedJobIDs, 1) require.Equal(t, currentBuild.Build.JobID, stats.TerminatedJobIDs[0]) // Check that the current provisioner job was updated. - job, err := db.GetProvisionerJobByID(ctx, currentBuild.Build.JobID) - require.NoError(t, err) - require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second) - require.True(t, job.CompletedAt.Valid) - require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second) - require.True(t, job.Error.Valid) - require.Contains(t, job.Error.String, "Build has been detected as hung") - require.False(t, job.ErrorCode.Valid) + requireTerminatedJob(ctx, t, env.DB, currentBuild.Build.JobID, now, jobreaper.Hung) // Check that the provisioner state was NOT copied. - build, err := db.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID) + build, err := env.DB.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID) require.NoError(t, err) - provisionerStateRow, err := db.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID) + provisionerStateRow, err := env.DB.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID) require.NoError(t, err) require.Equal(t, expectedWorkspaceBuildState, provisionerStateRow.ProvisionerState) - - detector.Close() - detector.Wait() } func TestDetectorHungWorkspaceBuildNoOverrideStateIfNoExistingBuild(t *testing.T) { t.Parallel() - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, pubsub = dbtestutil.NewDB(t) - log = testutil.Logger(t) - tickCh = make(chan time.Time) - statsCh = make(chan jobreaper.Stats) - ) + ctx := testutil.Context(t, testutil.WaitLong) + env := newDetectorTestEnv(ctx, t) + defer env.close() var ( now = time.Now() tenMinAgo = now.Add(-time.Minute * 10) sixMinAgo = now.Add(-time.Minute * 6) - org = dbgen.Organization(t, db, database.Organization{}) - user = dbgen.User(t, db, database.User{}) + org = dbgen.Organization(t, env.DB, database.Organization{}) + user = dbgen.User(t, env.DB, database.User{}) expectedWorkspaceBuildState = []byte(`{"dean":"cool","colin":"also cool"}`) ) // First build (hung - no previous build exists). // This build has provisioner state, which should NOT be overridden. - currentBuild := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ + currentBuild := dbfake.WorkspaceBuild(t, env.DB, database.WorkspaceTable{ OrganizationID: org.ID, OwnerID: user.ID, - }).Pubsub(pubsub).Seed(database.WorkspaceBuild{}). + }).Pubsub(env.Pubsub).Seed(database.WorkspaceBuild{}). ProvisionerState(expectedWorkspaceBuildState). Starting(dbfake.WithJobStartedAt(tenMinAgo), dbfake.WithJobUpdatedAt(sixMinAgo)). Do() t.Log("current job ID: ", currentBuild.Build.JobID) - detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh) - detector.Start() - tickCh <- now - - stats := <-statsCh + stats := env.tick(ctx, now) require.NoError(t, stats.Error) require.Len(t, stats.TerminatedJobIDs, 1) require.Equal(t, currentBuild.Build.JobID, stats.TerminatedJobIDs[0]) // Check that the current provisioner job was updated. - job, err := db.GetProvisionerJobByID(ctx, currentBuild.Build.JobID) - require.NoError(t, err) - require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second) - require.True(t, job.CompletedAt.Valid) - require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second) - require.True(t, job.Error.Valid) - require.Contains(t, job.Error.String, "Build has been detected as hung") - require.False(t, job.ErrorCode.Valid) + requireTerminatedJob(ctx, t, env.DB, currentBuild.Build.JobID, now, jobreaper.Hung) // Check that the provisioner state was NOT updated. - build, err := db.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID) + build, err := env.DB.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID) require.NoError(t, err) - provisionerStateRow, err := db.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID) + provisionerStateRow, err := env.DB.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID) require.NoError(t, err) require.Equal(t, expectedWorkspaceBuildState, provisionerStateRow.ProvisionerState) - - detector.Close() - detector.Wait() } func TestDetectorPendingWorkspaceBuildNoOverrideStateIfNoExistingBuild(t *testing.T) { t.Parallel() - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, pubsub = dbtestutil.NewDB(t) - log = testutil.Logger(t) - tickCh = make(chan time.Time) - statsCh = make(chan jobreaper.Stats) - ) + ctx := testutil.Context(t, testutil.WaitLong) + env := newDetectorTestEnv(ctx, t) + defer env.close() var ( now = time.Now() thirtyFiveMinAgo = now.Add(-time.Minute * 35) - org = dbgen.Organization(t, db, database.Organization{}) - user = dbgen.User(t, db, database.User{}) + org = dbgen.Organization(t, env.DB, database.Organization{}) + user = dbgen.User(t, env.DB, database.User{}) expectedWorkspaceBuildState = []byte(`{"dean":"cool","colin":"also cool"}`) ) // First build (hung pending - no previous build exists). // This build has provisioner state, which should NOT be overridden. - currentBuild := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ + currentBuild := dbfake.WorkspaceBuild(t, env.DB, database.WorkspaceTable{ OrganizationID: org.ID, OwnerID: user.ID, - }).Pubsub(pubsub).Seed(database.WorkspaceBuild{}). + }).Pubsub(env.Pubsub).Seed(database.WorkspaceBuild{}). ProvisionerState(expectedWorkspaceBuildState). Pending(dbfake.WithJobCreatedAt(thirtyFiveMinAgo), dbfake.WithJobUpdatedAt(thirtyFiveMinAgo)). Do() t.Log("current job ID: ", currentBuild.Build.JobID) - detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh) - detector.Start() - tickCh <- now - - stats := <-statsCh + stats := env.tick(ctx, now) require.NoError(t, stats.Error) require.Len(t, stats.TerminatedJobIDs, 1) require.Equal(t, currentBuild.Build.JobID, stats.TerminatedJobIDs[0]) // Check that the current provisioner job was updated. - job, err := db.GetProvisionerJobByID(ctx, currentBuild.Build.JobID) - require.NoError(t, err) - require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second) - require.True(t, job.CompletedAt.Valid) - require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second) - require.True(t, job.StartedAt.Valid) - require.WithinDuration(t, now, job.StartedAt.Time, 30*time.Second) - require.True(t, job.Error.Valid) - require.Contains(t, job.Error.String, "Build has been detected as pending") - require.False(t, job.ErrorCode.Valid) + requireTerminatedJob(ctx, t, env.DB, currentBuild.Build.JobID, now, jobreaper.Pending) // Check that the provisioner state was NOT updated. - build, err := db.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID) + build, err := env.DB.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID) require.NoError(t, err) - provisionerStateRow, err := db.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID) + provisionerStateRow, err := env.DB.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID) require.NoError(t, err) require.Equal(t, expectedWorkspaceBuildState, provisionerStateRow.ProvisionerState) - - detector.Close() - detector.Wait() } // TestDetectorWorkspaceBuildForDormantWorkspace ensures that the jobreaper has @@ -378,34 +352,30 @@ func TestDetectorPendingWorkspaceBuildNoOverrideStateIfNoExistingBuild(t *testin func TestDetectorWorkspaceBuildForDormantWorkspace(t *testing.T) { t.Parallel() - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, pubsub = dbtestutil.NewDB(t) - log = testutil.Logger(t) - tickCh = make(chan time.Time) - statsCh = make(chan jobreaper.Stats) - ) + ctx := testutil.Context(t, testutil.WaitLong) + env := newDetectorTestEnv(ctx, t) + defer env.close() var ( now = time.Now() tenMinAgo = now.Add(-time.Minute * 10) sixMinAgo = now.Add(-time.Minute * 6) - org = dbgen.Organization(t, db, database.Organization{}) - user = dbgen.User(t, db, database.User{}) + org = dbgen.Organization(t, env.DB, database.Organization{}) + user = dbgen.User(t, env.DB, database.User{}) expectedWorkspaceBuildState = []byte(`{"dean":"cool","colin":"also cool"}`) ) // First build (hung - running job with UpdatedAt > 5 min ago). // This build has provisioner state, which should NOT be overridden. // The workspace is dormant from the start. - currentBuild := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ + currentBuild := dbfake.WorkspaceBuild(t, env.DB, database.WorkspaceTable{ OrganizationID: org.ID, OwnerID: user.ID, DormantAt: sql.NullTime{ Time: now.Add(-time.Hour), Valid: true, }, - }).Pubsub(pubsub).Seed(database.WorkspaceBuild{}). + }).Pubsub(env.Pubsub).Seed(database.WorkspaceBuild{}). ProvisionerState(expectedWorkspaceBuildState). Starting(dbfake.WithJobStartedAt(tenMinAgo), dbfake.WithJobUpdatedAt(sixMinAgo)). Do() @@ -416,50 +386,32 @@ func TestDetectorWorkspaceBuildForDormantWorkspace(t *testing.T) { // thing. require.Equal(t, rbac.ResourceWorkspaceDormant.Type, currentBuild.Workspace.RBACObject().Type) - detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh) - detector.Start() - tickCh <- now - - stats := <-statsCh + stats := env.tick(ctx, now) require.NoError(t, stats.Error) require.Len(t, stats.TerminatedJobIDs, 1) require.Equal(t, currentBuild.Build.JobID, stats.TerminatedJobIDs[0]) // Check that the current provisioner job was updated. - job, err := db.GetProvisionerJobByID(ctx, currentBuild.Build.JobID) - require.NoError(t, err) - require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second) - require.True(t, job.CompletedAt.Valid) - require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second) - require.True(t, job.Error.Valid) - require.Contains(t, job.Error.String, "Build has been detected as hung") - require.False(t, job.ErrorCode.Valid) - - detector.Close() - detector.Wait() + requireTerminatedJob(ctx, t, env.DB, currentBuild.Build.JobID, now, jobreaper.Hung) } func TestDetectorHungOtherJobTypes(t *testing.T) { t.Parallel() - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, pubsub = dbtestutil.NewDB(t) - log = testutil.Logger(t) - tickCh = make(chan time.Time) - statsCh = make(chan jobreaper.Stats) - ) + ctx := testutil.Context(t, testutil.WaitLong) + env := newDetectorTestEnv(ctx, t) + defer env.close() var ( now = time.Now() tenMinAgo = now.Add(-time.Minute * 10) sixMinAgo = now.Add(-time.Minute * 6) - org = dbgen.Organization(t, db, database.Organization{}) - user = dbgen.User(t, db, database.User{}) - file = dbgen.File(t, db, database.File{}) + org = dbgen.Organization(t, env.DB, database.Organization{}) + user = dbgen.User(t, env.DB, database.User{}) + file = dbgen.File(t, env.DB, database.File{}) // Template import job. - templateImportJob = dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{ + templateImportJob = dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{ CreatedAt: tenMinAgo, UpdatedAt: sixMinAgo, StartedAt: sql.NullTime{ @@ -474,7 +426,7 @@ func TestDetectorHungOtherJobTypes(t *testing.T) { Type: database.ProvisionerJobTypeTemplateVersionImport, Input: []byte("{}"), }) - _ = dbgen.TemplateVersion(t, db, database.TemplateVersion{ + _ = dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{ OrganizationID: org.ID, JobID: templateImportJob.ID, CreatedBy: user.ID, @@ -482,7 +434,7 @@ func TestDetectorHungOtherJobTypes(t *testing.T) { ) // Template dry-run job. - dryRunVersion := dbgen.TemplateVersion(t, db, database.TemplateVersion{ + dryRunVersion := dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{ OrganizationID: org.ID, CreatedBy: user.ID, }) @@ -490,7 +442,7 @@ func TestDetectorHungOtherJobTypes(t *testing.T) { TemplateVersionID: dryRunVersion.ID, }) require.NoError(t, err) - templateDryRunJob := dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{ + templateDryRunJob := dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{ CreatedAt: tenMinAgo, UpdatedAt: sixMinAgo, StartedAt: sql.NullTime{ @@ -509,60 +461,33 @@ func TestDetectorHungOtherJobTypes(t *testing.T) { t.Log("template import job ID: ", templateImportJob.ID) t.Log("template dry-run job ID: ", templateDryRunJob.ID) - detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh) - detector.Start() - tickCh <- now - - stats := <-statsCh + stats := env.tick(ctx, now) require.NoError(t, stats.Error) require.Len(t, stats.TerminatedJobIDs, 2) require.Contains(t, stats.TerminatedJobIDs, templateImportJob.ID) require.Contains(t, stats.TerminatedJobIDs, templateDryRunJob.ID) - // Check that the template import job was updated. - job, err := db.GetProvisionerJobByID(ctx, templateImportJob.ID) - require.NoError(t, err) - require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second) - require.True(t, job.CompletedAt.Valid) - require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second) - require.True(t, job.Error.Valid) - require.Contains(t, job.Error.String, "Build has been detected as hung") - require.False(t, job.ErrorCode.Valid) - - // Check that the template dry-run job was updated. - job, err = db.GetProvisionerJobByID(ctx, templateDryRunJob.ID) - require.NoError(t, err) - require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second) - require.True(t, job.CompletedAt.Valid) - require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second) - require.True(t, job.Error.Valid) - require.Contains(t, job.Error.String, "Build has been detected as hung") - require.False(t, job.ErrorCode.Valid) - - detector.Close() - detector.Wait() + // Check that both jobs were terminated as hung. + requireTerminatedJob(ctx, t, env.DB, templateImportJob.ID, now, jobreaper.Hung) + requireTerminatedJob(ctx, t, env.DB, templateDryRunJob.ID, now, jobreaper.Hung) } func TestDetectorPendingOtherJobTypes(t *testing.T) { t.Parallel() - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, pubsub = dbtestutil.NewDB(t) - log = testutil.Logger(t) - tickCh = make(chan time.Time) - statsCh = make(chan jobreaper.Stats) - ) + ctx := testutil.Context(t, testutil.WaitLong) + env := newDetectorTestEnv(ctx, t) + defer env.close() var ( now = time.Now() thirtyFiveMinAgo = now.Add(-time.Minute * 35) - org = dbgen.Organization(t, db, database.Organization{}) - user = dbgen.User(t, db, database.User{}) - file = dbgen.File(t, db, database.File{}) + org = dbgen.Organization(t, env.DB, database.Organization{}) + user = dbgen.User(t, env.DB, database.User{}) + file = dbgen.File(t, env.DB, database.File{}) // Template import job. - templateImportJob = dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{ + templateImportJob = dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{ CreatedAt: thirtyFiveMinAgo, UpdatedAt: thirtyFiveMinAgo, StartedAt: sql.NullTime{ @@ -577,7 +502,7 @@ func TestDetectorPendingOtherJobTypes(t *testing.T) { Type: database.ProvisionerJobTypeTemplateVersionImport, Input: []byte("{}"), }) - _ = dbgen.TemplateVersion(t, db, database.TemplateVersion{ + _ = dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{ OrganizationID: org.ID, JobID: templateImportJob.ID, CreatedBy: user.ID, @@ -585,7 +510,7 @@ func TestDetectorPendingOtherJobTypes(t *testing.T) { ) // Template dry-run job. - dryRunVersion := dbgen.TemplateVersion(t, db, database.TemplateVersion{ + dryRunVersion := dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{ OrganizationID: org.ID, CreatedBy: user.ID, }) @@ -593,7 +518,7 @@ func TestDetectorPendingOtherJobTypes(t *testing.T) { TemplateVersionID: dryRunVersion.ID, }) require.NoError(t, err) - templateDryRunJob := dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{ + templateDryRunJob := dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{ CreatedAt: thirtyFiveMinAgo, UpdatedAt: thirtyFiveMinAgo, StartedAt: sql.NullTime{ @@ -612,65 +537,34 @@ func TestDetectorPendingOtherJobTypes(t *testing.T) { t.Log("template import job ID: ", templateImportJob.ID) t.Log("template dry-run job ID: ", templateDryRunJob.ID) - detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh) - detector.Start() - tickCh <- now - - stats := <-statsCh + stats := env.tick(ctx, now) require.NoError(t, stats.Error) require.Len(t, stats.TerminatedJobIDs, 2) require.Contains(t, stats.TerminatedJobIDs, templateImportJob.ID) require.Contains(t, stats.TerminatedJobIDs, templateDryRunJob.ID) - // Check that the template import job was updated. - job, err := db.GetProvisionerJobByID(ctx, templateImportJob.ID) - require.NoError(t, err) - require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second) - require.True(t, job.CompletedAt.Valid) - require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second) - require.True(t, job.StartedAt.Valid) - require.WithinDuration(t, now, job.StartedAt.Time, 30*time.Second) - require.True(t, job.Error.Valid) - require.Contains(t, job.Error.String, "Build has been detected as pending") - require.False(t, job.ErrorCode.Valid) - - // Check that the template dry-run job was updated. - job, err = db.GetProvisionerJobByID(ctx, templateDryRunJob.ID) - require.NoError(t, err) - require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second) - require.True(t, job.CompletedAt.Valid) - require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second) - require.True(t, job.StartedAt.Valid) - require.WithinDuration(t, now, job.StartedAt.Time, 30*time.Second) - require.True(t, job.Error.Valid) - require.Contains(t, job.Error.String, "Build has been detected as pending") - require.False(t, job.ErrorCode.Valid) - - detector.Close() - detector.Wait() + // Check that both jobs were terminated as pending. + requireTerminatedJob(ctx, t, env.DB, templateImportJob.ID, now, jobreaper.Pending) + requireTerminatedJob(ctx, t, env.DB, templateDryRunJob.ID, now, jobreaper.Pending) } func TestDetectorHungCanceledJob(t *testing.T) { t.Parallel() - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, pubsub = dbtestutil.NewDB(t) - log = testutil.Logger(t) - tickCh = make(chan time.Time) - statsCh = make(chan jobreaper.Stats) - ) + ctx := testutil.Context(t, testutil.WaitLong) + env := newDetectorTestEnv(ctx, t) + defer env.close() var ( now = time.Now() tenMinAgo = now.Add(-time.Minute * 10) sixMinAgo = now.Add(-time.Minute * 6) - org = dbgen.Organization(t, db, database.Organization{}) - user = dbgen.User(t, db, database.User{}) - file = dbgen.File(t, db, database.File{}) + org = dbgen.Organization(t, env.DB, database.Organization{}) + user = dbgen.User(t, env.DB, database.User{}) + file = dbgen.File(t, env.DB, database.File{}) // Template import job. - templateImportJob = dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{ + templateImportJob = dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{ CreatedAt: tenMinAgo, CanceledAt: sql.NullTime{ Time: tenMinAgo, @@ -689,7 +583,7 @@ func TestDetectorHungCanceledJob(t *testing.T) { Type: database.ProvisionerJobTypeTemplateVersionImport, Input: []byte("{}"), }) - _ = dbgen.TemplateVersion(t, db, database.TemplateVersion{ + _ = dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{ OrganizationID: org.ID, JobID: templateImportJob.ID, CreatedBy: user.ID, @@ -698,27 +592,13 @@ func TestDetectorHungCanceledJob(t *testing.T) { t.Log("template import job ID: ", templateImportJob.ID) - detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh) - detector.Start() - tickCh <- now - - stats := <-statsCh + stats := env.tick(ctx, now) require.NoError(t, stats.Error) require.Len(t, stats.TerminatedJobIDs, 1) require.Contains(t, stats.TerminatedJobIDs, templateImportJob.ID) // Check that the job was updated. - job, err := db.GetProvisionerJobByID(ctx, templateImportJob.ID) - require.NoError(t, err) - require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second) - require.True(t, job.CompletedAt.Valid) - require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second) - require.True(t, job.Error.Valid) - require.Contains(t, job.Error.String, "Build has been detected as hung") - require.False(t, job.ErrorCode.Valid) - - detector.Close() - detector.Wait() + requireTerminatedJob(ctx, t, env.DB, templateImportJob.ID, now, jobreaper.Hung) } func TestDetectorPushesLogs(t *testing.T) { @@ -753,24 +633,20 @@ func TestDetectorPushesLogs(t *testing.T) { t.Run(c.name, func(t *testing.T) { t.Parallel() - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, pubsub = dbtestutil.NewDB(t) - log = testutil.Logger(t) - tickCh = make(chan time.Time) - statsCh = make(chan jobreaper.Stats) - ) + ctx := testutil.Context(t, testutil.WaitLong) + env := newDetectorTestEnv(ctx, t) + defer env.close() var ( now = time.Now() tenMinAgo = now.Add(-time.Minute * 10) sixMinAgo = now.Add(-time.Minute * 6) - org = dbgen.Organization(t, db, database.Organization{}) - user = dbgen.User(t, db, database.User{}) - file = dbgen.File(t, db, database.File{}) + org = dbgen.Organization(t, env.DB, database.Organization{}) + user = dbgen.User(t, env.DB, database.User{}) + file = dbgen.File(t, env.DB, database.File{}) // Template import job. - templateImportJob = dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{ + templateImportJob = dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{ CreatedAt: tenMinAgo, UpdatedAt: sixMinAgo, StartedAt: sql.NullTime{ @@ -785,7 +661,7 @@ func TestDetectorPushesLogs(t *testing.T) { Type: database.ProvisionerJobTypeTemplateVersionImport, Input: []byte("{}"), }) - _ = dbgen.TemplateVersion(t, db, database.TemplateVersion{ + _ = dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{ OrganizationID: org.ID, JobID: templateImportJob.ID, CreatedBy: user.ID, @@ -806,17 +682,14 @@ func TestDetectorPushesLogs(t *testing.T) { insertParams.Source = append(insertParams.Source, database.LogSourceProvisioner) insertParams.Output = append(insertParams.Output, fmt.Sprintf("Output %d", i)) } - logs, err := db.InsertProvisionerJobLogs(ctx, insertParams) + logs, err := env.DB.InsertProvisionerJobLogs(ctx, insertParams) require.NoError(t, err) require.Len(t, logs, 10) } - detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh) - detector.Start() - // Create pubsub subscription to listen for new log events. pubsubCalled := make(chan int64, 1) - pubsubCancel, err := pubsub.Subscribe(provisionersdk.ProvisionerJobLogsNotifyChannel(templateImportJob.ID), func(ctx context.Context, message []byte) { + pubsubCancel, err := env.Pubsub.Subscribe(provisionersdk.ProvisionerJobLogsNotifyChannel(templateImportJob.ID), func(ctx context.Context, message []byte) { defer close(pubsubCalled) var event provisionersdk.ProvisionerJobLogsNotifyMessage err := json.Unmarshal(message, &event) @@ -830,9 +703,7 @@ func TestDetectorPushesLogs(t *testing.T) { require.NoError(t, err) defer pubsubCancel() - tickCh <- now - - stats := <-statsCh + stats := env.tick(ctx, now) require.NoError(t, stats.Error) require.Len(t, stats.TerminatedJobIDs, 1) require.Contains(t, stats.TerminatedJobIDs, templateImportJob.ID) @@ -841,7 +712,7 @@ func TestDetectorPushesLogs(t *testing.T) { // Get the jobs after the given time and check that they are what we // expect. - logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{ + logs, err := env.DB.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{ JobID: templateImportJob.ID, CreatedAfter: after, }) @@ -862,15 +733,12 @@ func TestDetectorPushesLogs(t *testing.T) { } // Double check the full log count. - logs, err = db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{ + logs, err = env.DB.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{ JobID: templateImportJob.ID, CreatedAfter: 0, }) require.NoError(t, err) require.Len(t, logs, c.preLogCount+len(expectedLogs)) - - detector.Close() - detector.Wait() }) } } @@ -878,21 +746,18 @@ func TestDetectorPushesLogs(t *testing.T) { func TestDetectorMaxJobsPerRun(t *testing.T) { t.Parallel() - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, pubsub = dbtestutil.NewDB(t) - log = testutil.Logger(t) - tickCh = make(chan time.Time) - statsCh = make(chan jobreaper.Stats) - org = dbgen.Organization(t, db, database.Organization{}) - user = dbgen.User(t, db, database.User{}) - file = dbgen.File(t, db, database.File{}) - ) + ctx := testutil.Context(t, testutil.WaitLong) + env := newDetectorTestEnv(ctx, t) + defer env.close() + + org := dbgen.Organization(t, env.DB, database.Organization{}) + user := dbgen.User(t, env.DB, database.User{}) + file := dbgen.File(t, env.DB, database.File{}) // Create MaxJobsPerRun + 1 hung jobs. now := time.Now() for i := 0; i < jobreaper.MaxJobsPerRun+1; i++ { - pj := dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{ + pj := dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{ CreatedAt: now.Add(-time.Hour), UpdatedAt: now.Add(-time.Hour), StartedAt: sql.NullTime{ @@ -907,31 +772,23 @@ func TestDetectorMaxJobsPerRun(t *testing.T) { Type: database.ProvisionerJobTypeTemplateVersionImport, Input: []byte("{}"), }) - _ = dbgen.TemplateVersion(t, db, database.TemplateVersion{ + _ = dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{ OrganizationID: org.ID, JobID: pj.ID, CreatedBy: user.ID, }) } - detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh) - detector.Start() - tickCh <- now - // Make sure that only MaxJobsPerRun jobs are terminated. - stats := <-statsCh + stats := env.tick(ctx, now) require.NoError(t, stats.Error) require.Len(t, stats.TerminatedJobIDs, jobreaper.MaxJobsPerRun) // Run the detector again and make sure that only the remaining job is // terminated. - tickCh <- now - stats = <-statsCh + stats = env.tick(ctx, now) require.NoError(t, stats.Error) require.Len(t, stats.TerminatedJobIDs, 1) - - detector.Close() - detector.Wait() } // wrapDBAuthz adds our Authorization/RBAC around the given database store, to