mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
refactor: extract test helpers in coderd/jobreaper to reduce duplication (#23001)
This commit is contained in:
+201
-344
@@ -8,6 +8,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -20,6 +21,7 @@ import (
|
||||
"github.com/coder/coder/v2/coderd/database/dbfake"
|
||||
"github.com/coder/coder/v2/coderd/database/dbgen"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtestutil"
|
||||
"github.com/coder/coder/v2/coderd/database/pubsub"
|
||||
"github.com/coder/coder/v2/coderd/jobreaper"
|
||||
"github.com/coder/coder/v2/coderd/provisionerdserver"
|
||||
"github.com/coder/coder/v2/coderd/rbac"
|
||||
@@ -31,48 +33,101 @@ func TestMain(m *testing.M) {
|
||||
goleak.VerifyTestMain(m, testutil.GoleakOptions...)
|
||||
}
|
||||
|
||||
// detectorTestEnv provides common infrastructure for jobreaper detector tests,
|
||||
// reducing the repeated setup/teardown boilerplate across every test function.
|
||||
type detectorTestEnv struct {
|
||||
t *testing.T
|
||||
DB database.Store
|
||||
Pubsub pubsub.Pubsub
|
||||
detector *jobreaper.Detector
|
||||
tickCh chan time.Time
|
||||
statsCh chan jobreaper.Stats
|
||||
}
|
||||
|
||||
// newDetectorTestEnv creates a new test environment with a started detector.
|
||||
func newDetectorTestEnv(ctx context.Context, t *testing.T) *detectorTestEnv {
|
||||
t.Helper()
|
||||
db, ps := dbtestutil.NewDB(t)
|
||||
log := testutil.Logger(t)
|
||||
tickCh := make(chan time.Time)
|
||||
statsCh := make(chan jobreaper.Stats)
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), ps, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
|
||||
return &detectorTestEnv{
|
||||
t: t,
|
||||
DB: db,
|
||||
Pubsub: ps,
|
||||
detector: detector,
|
||||
tickCh: tickCh,
|
||||
statsCh: statsCh,
|
||||
}
|
||||
}
|
||||
|
||||
// tick sends a tick with the given time and returns the stats from the
|
||||
// detector run. It respects context cancellation to avoid blocking forever
|
||||
// if the detector exits unexpectedly.
|
||||
//
|
||||
// tick must not be called from a separate goroutine, as it calls
|
||||
// require.FailNow which uses runtime.Goexit under the hood.
|
||||
func (e *detectorTestEnv) tick(ctx context.Context, now time.Time) jobreaper.Stats {
|
||||
e.t.Helper()
|
||||
testutil.RequireSend(ctx, e.t, e.tickCh, now)
|
||||
return testutil.RequireReceive(ctx, e.t, e.statsCh)
|
||||
}
|
||||
|
||||
// close stops the detector and waits for it to finish.
|
||||
func (e *detectorTestEnv) close() {
|
||||
e.detector.Close()
|
||||
e.detector.Wait()
|
||||
}
|
||||
|
||||
// requireTerminatedJob asserts that a provisioner job was properly terminated
|
||||
// by the job reaper with the expected reap type (hung or pending).
|
||||
func requireTerminatedJob(ctx context.Context, t *testing.T, db database.Store, jobID uuid.UUID, now time.Time, reapType jobreaper.ReapType) {
|
||||
t.Helper()
|
||||
job, err := db.GetProvisionerJobByID(ctx, jobID)
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
|
||||
require.True(t, job.CompletedAt.Valid)
|
||||
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
|
||||
if reapType == jobreaper.Pending {
|
||||
require.True(t, job.StartedAt.Valid)
|
||||
require.WithinDuration(t, now, job.StartedAt.Time, 30*time.Second)
|
||||
}
|
||||
require.True(t, job.Error.Valid)
|
||||
require.Contains(t, job.Error.String, fmt.Sprintf("Build has been detected as %s", reapType))
|
||||
require.False(t, job.ErrorCode.Valid)
|
||||
}
|
||||
|
||||
func TestDetectorNoJobs(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
db, pubsub = dbtestutil.NewDB(t)
|
||||
log = testutil.Logger(t)
|
||||
tickCh = make(chan time.Time)
|
||||
statsCh = make(chan jobreaper.Stats)
|
||||
)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
env := newDetectorTestEnv(ctx, t)
|
||||
defer env.close()
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
tickCh <- time.Now()
|
||||
|
||||
stats := <-statsCh
|
||||
stats := env.tick(ctx, time.Now())
|
||||
require.NoError(t, stats.Error)
|
||||
require.Empty(t, stats.TerminatedJobIDs)
|
||||
|
||||
detector.Close()
|
||||
detector.Wait()
|
||||
}
|
||||
|
||||
func TestDetectorNoHungJobs(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
db, pubsub = dbtestutil.NewDB(t)
|
||||
log = testutil.Logger(t)
|
||||
tickCh = make(chan time.Time)
|
||||
statsCh = make(chan jobreaper.Stats)
|
||||
)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
env := newDetectorTestEnv(ctx, t)
|
||||
defer env.close()
|
||||
|
||||
// Insert some jobs that are running and haven't been updated in a while,
|
||||
// but not enough to be considered hung.
|
||||
now := time.Now()
|
||||
org := dbgen.Organization(t, db, database.Organization{})
|
||||
user := dbgen.User(t, db, database.User{})
|
||||
file := dbgen.File(t, db, database.File{})
|
||||
org := dbgen.Organization(t, env.DB, database.Organization{})
|
||||
user := dbgen.User(t, env.DB, database.User{})
|
||||
file := dbgen.File(t, env.DB, database.File{})
|
||||
for i := 0; i < 5; i++ {
|
||||
dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{
|
||||
dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{
|
||||
CreatedAt: now.Add(-time.Minute * 5),
|
||||
UpdatedAt: now.Add(-time.Minute * time.Duration(i)),
|
||||
StartedAt: sql.NullTime{
|
||||
@@ -89,51 +144,40 @@ func TestDetectorNoHungJobs(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
tickCh <- now
|
||||
|
||||
stats := <-statsCh
|
||||
stats := env.tick(ctx, now)
|
||||
require.NoError(t, stats.Error)
|
||||
require.Empty(t, stats.TerminatedJobIDs)
|
||||
|
||||
detector.Close()
|
||||
detector.Wait()
|
||||
}
|
||||
|
||||
func TestDetectorHungWorkspaceBuild(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
db, pubsub = dbtestutil.NewDB(t)
|
||||
log = testutil.Logger(t)
|
||||
tickCh = make(chan time.Time)
|
||||
statsCh = make(chan jobreaper.Stats)
|
||||
)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
env := newDetectorTestEnv(ctx, t)
|
||||
defer env.close()
|
||||
|
||||
var (
|
||||
now = time.Now()
|
||||
twentyMinAgo = now.Add(-time.Minute * 20)
|
||||
tenMinAgo = now.Add(-time.Minute * 10)
|
||||
sixMinAgo = now.Add(-time.Minute * 6)
|
||||
org = dbgen.Organization(t, db, database.Organization{})
|
||||
user = dbgen.User(t, db, database.User{})
|
||||
org = dbgen.Organization(t, env.DB, database.Organization{})
|
||||
user = dbgen.User(t, env.DB, database.User{})
|
||||
expectedWorkspaceBuildState = []byte(`{"dean":"cool","colin":"also cool"}`)
|
||||
)
|
||||
|
||||
// Previous build (completed successfully).
|
||||
previousBuild := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
|
||||
previousBuild := dbfake.WorkspaceBuild(t, env.DB, database.WorkspaceTable{
|
||||
OrganizationID: org.ID,
|
||||
OwnerID: user.ID,
|
||||
}).Pubsub(pubsub).Seed(database.WorkspaceBuild{}).
|
||||
}).Pubsub(env.Pubsub).Seed(database.WorkspaceBuild{}).
|
||||
ProvisionerState(expectedWorkspaceBuildState).
|
||||
Succeeded(dbfake.WithJobCompletedAt(twentyMinAgo)).
|
||||
Do()
|
||||
|
||||
// Current build (hung - running job with UpdatedAt > 5 min ago).
|
||||
currentBuild := dbfake.WorkspaceBuild(t, db, previousBuild.Workspace).
|
||||
Pubsub(pubsub).
|
||||
currentBuild := dbfake.WorkspaceBuild(t, env.DB, previousBuild.Workspace).
|
||||
Pubsub(env.Pubsub).
|
||||
Seed(database.WorkspaceBuild{BuildNumber: 2}).
|
||||
Starting(dbfake.WithJobStartedAt(tenMinAgo), dbfake.WithJobUpdatedAt(sixMinAgo)).
|
||||
Do()
|
||||
@@ -141,70 +185,52 @@ func TestDetectorHungWorkspaceBuild(t *testing.T) {
|
||||
t.Log("previous job ID: ", previousBuild.Build.JobID)
|
||||
t.Log("current job ID: ", currentBuild.Build.JobID)
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
tickCh <- now
|
||||
|
||||
stats := <-statsCh
|
||||
stats := env.tick(ctx, now)
|
||||
require.NoError(t, stats.Error)
|
||||
require.Len(t, stats.TerminatedJobIDs, 1)
|
||||
require.Equal(t, currentBuild.Build.JobID, stats.TerminatedJobIDs[0])
|
||||
|
||||
// Check that the current provisioner job was updated.
|
||||
job, err := db.GetProvisionerJobByID(ctx, currentBuild.Build.JobID)
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
|
||||
require.True(t, job.CompletedAt.Valid)
|
||||
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
|
||||
require.True(t, job.Error.Valid)
|
||||
require.Contains(t, job.Error.String, "Build has been detected as hung")
|
||||
require.False(t, job.ErrorCode.Valid)
|
||||
requireTerminatedJob(ctx, t, env.DB, currentBuild.Build.JobID, now, jobreaper.Hung)
|
||||
|
||||
// Check that the provisioner state was copied.
|
||||
build, err := db.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID)
|
||||
build, err := env.DB.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID)
|
||||
require.NoError(t, err)
|
||||
provisionerStateRow, err := db.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID)
|
||||
provisionerStateRow, err := env.DB.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedWorkspaceBuildState, provisionerStateRow.ProvisionerState)
|
||||
|
||||
detector.Close()
|
||||
detector.Wait()
|
||||
}
|
||||
|
||||
func TestDetectorHungWorkspaceBuildNoOverrideState(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
db, pubsub = dbtestutil.NewDB(t)
|
||||
log = testutil.Logger(t)
|
||||
tickCh = make(chan time.Time)
|
||||
statsCh = make(chan jobreaper.Stats)
|
||||
)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
env := newDetectorTestEnv(ctx, t)
|
||||
defer env.close()
|
||||
|
||||
var (
|
||||
now = time.Now()
|
||||
twentyMinAgo = now.Add(-time.Minute * 20)
|
||||
tenMinAgo = now.Add(-time.Minute * 10)
|
||||
sixMinAgo = now.Add(-time.Minute * 6)
|
||||
org = dbgen.Organization(t, db, database.Organization{})
|
||||
user = dbgen.User(t, db, database.User{})
|
||||
org = dbgen.Organization(t, env.DB, database.Organization{})
|
||||
user = dbgen.User(t, env.DB, database.User{})
|
||||
expectedWorkspaceBuildState = []byte(`{"dean":"cool","colin":"also cool"}`)
|
||||
)
|
||||
|
||||
// Previous build (completed successfully).
|
||||
previousBuild := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
|
||||
previousBuild := dbfake.WorkspaceBuild(t, env.DB, database.WorkspaceTable{
|
||||
OrganizationID: org.ID,
|
||||
OwnerID: user.ID,
|
||||
}).Pubsub(pubsub).Seed(database.WorkspaceBuild{}).
|
||||
}).Pubsub(env.Pubsub).Seed(database.WorkspaceBuild{}).
|
||||
ProvisionerState([]byte(`{"dean":"NOT cool","colin":"also NOT cool"}`)).
|
||||
Succeeded(dbfake.WithJobCompletedAt(twentyMinAgo)).
|
||||
Do()
|
||||
|
||||
// Current build (hung - running job with UpdatedAt > 5 min ago).
|
||||
// This build already has provisioner state, which should NOT be overridden.
|
||||
currentBuild := dbfake.WorkspaceBuild(t, db, previousBuild.Workspace).
|
||||
Pubsub(pubsub).
|
||||
currentBuild := dbfake.WorkspaceBuild(t, env.DB, previousBuild.Workspace).
|
||||
Pubsub(env.Pubsub).
|
||||
Seed(database.WorkspaceBuild{
|
||||
BuildNumber: 2,
|
||||
}).ProvisionerState(expectedWorkspaceBuildState).
|
||||
@@ -214,159 +240,107 @@ func TestDetectorHungWorkspaceBuildNoOverrideState(t *testing.T) {
|
||||
t.Log("previous job ID: ", previousBuild.Build.JobID)
|
||||
t.Log("current job ID: ", currentBuild.Build.JobID)
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
tickCh <- now
|
||||
|
||||
stats := <-statsCh
|
||||
stats := env.tick(ctx, now)
|
||||
require.NoError(t, stats.Error)
|
||||
require.Len(t, stats.TerminatedJobIDs, 1)
|
||||
require.Equal(t, currentBuild.Build.JobID, stats.TerminatedJobIDs[0])
|
||||
|
||||
// Check that the current provisioner job was updated.
|
||||
job, err := db.GetProvisionerJobByID(ctx, currentBuild.Build.JobID)
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
|
||||
require.True(t, job.CompletedAt.Valid)
|
||||
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
|
||||
require.True(t, job.Error.Valid)
|
||||
require.Contains(t, job.Error.String, "Build has been detected as hung")
|
||||
require.False(t, job.ErrorCode.Valid)
|
||||
requireTerminatedJob(ctx, t, env.DB, currentBuild.Build.JobID, now, jobreaper.Hung)
|
||||
|
||||
// Check that the provisioner state was NOT copied.
|
||||
build, err := db.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID)
|
||||
build, err := env.DB.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID)
|
||||
require.NoError(t, err)
|
||||
provisionerStateRow, err := db.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID)
|
||||
provisionerStateRow, err := env.DB.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedWorkspaceBuildState, provisionerStateRow.ProvisionerState)
|
||||
|
||||
detector.Close()
|
||||
detector.Wait()
|
||||
}
|
||||
|
||||
func TestDetectorHungWorkspaceBuildNoOverrideStateIfNoExistingBuild(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
db, pubsub = dbtestutil.NewDB(t)
|
||||
log = testutil.Logger(t)
|
||||
tickCh = make(chan time.Time)
|
||||
statsCh = make(chan jobreaper.Stats)
|
||||
)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
env := newDetectorTestEnv(ctx, t)
|
||||
defer env.close()
|
||||
|
||||
var (
|
||||
now = time.Now()
|
||||
tenMinAgo = now.Add(-time.Minute * 10)
|
||||
sixMinAgo = now.Add(-time.Minute * 6)
|
||||
org = dbgen.Organization(t, db, database.Organization{})
|
||||
user = dbgen.User(t, db, database.User{})
|
||||
org = dbgen.Organization(t, env.DB, database.Organization{})
|
||||
user = dbgen.User(t, env.DB, database.User{})
|
||||
expectedWorkspaceBuildState = []byte(`{"dean":"cool","colin":"also cool"}`)
|
||||
)
|
||||
|
||||
// First build (hung - no previous build exists).
|
||||
// This build has provisioner state, which should NOT be overridden.
|
||||
currentBuild := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
|
||||
currentBuild := dbfake.WorkspaceBuild(t, env.DB, database.WorkspaceTable{
|
||||
OrganizationID: org.ID,
|
||||
OwnerID: user.ID,
|
||||
}).Pubsub(pubsub).Seed(database.WorkspaceBuild{}).
|
||||
}).Pubsub(env.Pubsub).Seed(database.WorkspaceBuild{}).
|
||||
ProvisionerState(expectedWorkspaceBuildState).
|
||||
Starting(dbfake.WithJobStartedAt(tenMinAgo), dbfake.WithJobUpdatedAt(sixMinAgo)).
|
||||
Do()
|
||||
|
||||
t.Log("current job ID: ", currentBuild.Build.JobID)
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
tickCh <- now
|
||||
|
||||
stats := <-statsCh
|
||||
stats := env.tick(ctx, now)
|
||||
require.NoError(t, stats.Error)
|
||||
require.Len(t, stats.TerminatedJobIDs, 1)
|
||||
require.Equal(t, currentBuild.Build.JobID, stats.TerminatedJobIDs[0])
|
||||
|
||||
// Check that the current provisioner job was updated.
|
||||
job, err := db.GetProvisionerJobByID(ctx, currentBuild.Build.JobID)
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
|
||||
require.True(t, job.CompletedAt.Valid)
|
||||
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
|
||||
require.True(t, job.Error.Valid)
|
||||
require.Contains(t, job.Error.String, "Build has been detected as hung")
|
||||
require.False(t, job.ErrorCode.Valid)
|
||||
requireTerminatedJob(ctx, t, env.DB, currentBuild.Build.JobID, now, jobreaper.Hung)
|
||||
|
||||
// Check that the provisioner state was NOT updated.
|
||||
build, err := db.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID)
|
||||
build, err := env.DB.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID)
|
||||
require.NoError(t, err)
|
||||
provisionerStateRow, err := db.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID)
|
||||
provisionerStateRow, err := env.DB.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedWorkspaceBuildState, provisionerStateRow.ProvisionerState)
|
||||
|
||||
detector.Close()
|
||||
detector.Wait()
|
||||
}
|
||||
|
||||
func TestDetectorPendingWorkspaceBuildNoOverrideStateIfNoExistingBuild(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
db, pubsub = dbtestutil.NewDB(t)
|
||||
log = testutil.Logger(t)
|
||||
tickCh = make(chan time.Time)
|
||||
statsCh = make(chan jobreaper.Stats)
|
||||
)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
env := newDetectorTestEnv(ctx, t)
|
||||
defer env.close()
|
||||
|
||||
var (
|
||||
now = time.Now()
|
||||
thirtyFiveMinAgo = now.Add(-time.Minute * 35)
|
||||
org = dbgen.Organization(t, db, database.Organization{})
|
||||
user = dbgen.User(t, db, database.User{})
|
||||
org = dbgen.Organization(t, env.DB, database.Organization{})
|
||||
user = dbgen.User(t, env.DB, database.User{})
|
||||
expectedWorkspaceBuildState = []byte(`{"dean":"cool","colin":"also cool"}`)
|
||||
)
|
||||
|
||||
// First build (hung pending - no previous build exists).
|
||||
// This build has provisioner state, which should NOT be overridden.
|
||||
currentBuild := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
|
||||
currentBuild := dbfake.WorkspaceBuild(t, env.DB, database.WorkspaceTable{
|
||||
OrganizationID: org.ID,
|
||||
OwnerID: user.ID,
|
||||
}).Pubsub(pubsub).Seed(database.WorkspaceBuild{}).
|
||||
}).Pubsub(env.Pubsub).Seed(database.WorkspaceBuild{}).
|
||||
ProvisionerState(expectedWorkspaceBuildState).
|
||||
Pending(dbfake.WithJobCreatedAt(thirtyFiveMinAgo), dbfake.WithJobUpdatedAt(thirtyFiveMinAgo)).
|
||||
Do()
|
||||
|
||||
t.Log("current job ID: ", currentBuild.Build.JobID)
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
tickCh <- now
|
||||
|
||||
stats := <-statsCh
|
||||
stats := env.tick(ctx, now)
|
||||
require.NoError(t, stats.Error)
|
||||
require.Len(t, stats.TerminatedJobIDs, 1)
|
||||
require.Equal(t, currentBuild.Build.JobID, stats.TerminatedJobIDs[0])
|
||||
|
||||
// Check that the current provisioner job was updated.
|
||||
job, err := db.GetProvisionerJobByID(ctx, currentBuild.Build.JobID)
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
|
||||
require.True(t, job.CompletedAt.Valid)
|
||||
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
|
||||
require.True(t, job.StartedAt.Valid)
|
||||
require.WithinDuration(t, now, job.StartedAt.Time, 30*time.Second)
|
||||
require.True(t, job.Error.Valid)
|
||||
require.Contains(t, job.Error.String, "Build has been detected as pending")
|
||||
require.False(t, job.ErrorCode.Valid)
|
||||
requireTerminatedJob(ctx, t, env.DB, currentBuild.Build.JobID, now, jobreaper.Pending)
|
||||
|
||||
// Check that the provisioner state was NOT updated.
|
||||
build, err := db.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID)
|
||||
build, err := env.DB.GetWorkspaceBuildByID(ctx, currentBuild.Build.ID)
|
||||
require.NoError(t, err)
|
||||
provisionerStateRow, err := db.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID)
|
||||
provisionerStateRow, err := env.DB.GetWorkspaceBuildProvisionerStateByID(ctx, build.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedWorkspaceBuildState, provisionerStateRow.ProvisionerState)
|
||||
|
||||
detector.Close()
|
||||
detector.Wait()
|
||||
}
|
||||
|
||||
// TestDetectorWorkspaceBuildForDormantWorkspace ensures that the jobreaper has
|
||||
@@ -378,34 +352,30 @@ func TestDetectorPendingWorkspaceBuildNoOverrideStateIfNoExistingBuild(t *testin
|
||||
func TestDetectorWorkspaceBuildForDormantWorkspace(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
db, pubsub = dbtestutil.NewDB(t)
|
||||
log = testutil.Logger(t)
|
||||
tickCh = make(chan time.Time)
|
||||
statsCh = make(chan jobreaper.Stats)
|
||||
)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
env := newDetectorTestEnv(ctx, t)
|
||||
defer env.close()
|
||||
|
||||
var (
|
||||
now = time.Now()
|
||||
tenMinAgo = now.Add(-time.Minute * 10)
|
||||
sixMinAgo = now.Add(-time.Minute * 6)
|
||||
org = dbgen.Organization(t, db, database.Organization{})
|
||||
user = dbgen.User(t, db, database.User{})
|
||||
org = dbgen.Organization(t, env.DB, database.Organization{})
|
||||
user = dbgen.User(t, env.DB, database.User{})
|
||||
expectedWorkspaceBuildState = []byte(`{"dean":"cool","colin":"also cool"}`)
|
||||
)
|
||||
|
||||
// First build (hung - running job with UpdatedAt > 5 min ago).
|
||||
// This build has provisioner state, which should NOT be overridden.
|
||||
// The workspace is dormant from the start.
|
||||
currentBuild := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
|
||||
currentBuild := dbfake.WorkspaceBuild(t, env.DB, database.WorkspaceTable{
|
||||
OrganizationID: org.ID,
|
||||
OwnerID: user.ID,
|
||||
DormantAt: sql.NullTime{
|
||||
Time: now.Add(-time.Hour),
|
||||
Valid: true,
|
||||
},
|
||||
}).Pubsub(pubsub).Seed(database.WorkspaceBuild{}).
|
||||
}).Pubsub(env.Pubsub).Seed(database.WorkspaceBuild{}).
|
||||
ProvisionerState(expectedWorkspaceBuildState).
|
||||
Starting(dbfake.WithJobStartedAt(tenMinAgo), dbfake.WithJobUpdatedAt(sixMinAgo)).
|
||||
Do()
|
||||
@@ -416,50 +386,32 @@ func TestDetectorWorkspaceBuildForDormantWorkspace(t *testing.T) {
|
||||
// thing.
|
||||
require.Equal(t, rbac.ResourceWorkspaceDormant.Type, currentBuild.Workspace.RBACObject().Type)
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
tickCh <- now
|
||||
|
||||
stats := <-statsCh
|
||||
stats := env.tick(ctx, now)
|
||||
require.NoError(t, stats.Error)
|
||||
require.Len(t, stats.TerminatedJobIDs, 1)
|
||||
require.Equal(t, currentBuild.Build.JobID, stats.TerminatedJobIDs[0])
|
||||
|
||||
// Check that the current provisioner job was updated.
|
||||
job, err := db.GetProvisionerJobByID(ctx, currentBuild.Build.JobID)
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
|
||||
require.True(t, job.CompletedAt.Valid)
|
||||
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
|
||||
require.True(t, job.Error.Valid)
|
||||
require.Contains(t, job.Error.String, "Build has been detected as hung")
|
||||
require.False(t, job.ErrorCode.Valid)
|
||||
|
||||
detector.Close()
|
||||
detector.Wait()
|
||||
requireTerminatedJob(ctx, t, env.DB, currentBuild.Build.JobID, now, jobreaper.Hung)
|
||||
}
|
||||
|
||||
func TestDetectorHungOtherJobTypes(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
db, pubsub = dbtestutil.NewDB(t)
|
||||
log = testutil.Logger(t)
|
||||
tickCh = make(chan time.Time)
|
||||
statsCh = make(chan jobreaper.Stats)
|
||||
)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
env := newDetectorTestEnv(ctx, t)
|
||||
defer env.close()
|
||||
|
||||
var (
|
||||
now = time.Now()
|
||||
tenMinAgo = now.Add(-time.Minute * 10)
|
||||
sixMinAgo = now.Add(-time.Minute * 6)
|
||||
org = dbgen.Organization(t, db, database.Organization{})
|
||||
user = dbgen.User(t, db, database.User{})
|
||||
file = dbgen.File(t, db, database.File{})
|
||||
org = dbgen.Organization(t, env.DB, database.Organization{})
|
||||
user = dbgen.User(t, env.DB, database.User{})
|
||||
file = dbgen.File(t, env.DB, database.File{})
|
||||
|
||||
// Template import job.
|
||||
templateImportJob = dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{
|
||||
templateImportJob = dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{
|
||||
CreatedAt: tenMinAgo,
|
||||
UpdatedAt: sixMinAgo,
|
||||
StartedAt: sql.NullTime{
|
||||
@@ -474,7 +426,7 @@ func TestDetectorHungOtherJobTypes(t *testing.T) {
|
||||
Type: database.ProvisionerJobTypeTemplateVersionImport,
|
||||
Input: []byte("{}"),
|
||||
})
|
||||
_ = dbgen.TemplateVersion(t, db, database.TemplateVersion{
|
||||
_ = dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{
|
||||
OrganizationID: org.ID,
|
||||
JobID: templateImportJob.ID,
|
||||
CreatedBy: user.ID,
|
||||
@@ -482,7 +434,7 @@ func TestDetectorHungOtherJobTypes(t *testing.T) {
|
||||
)
|
||||
|
||||
// Template dry-run job.
|
||||
dryRunVersion := dbgen.TemplateVersion(t, db, database.TemplateVersion{
|
||||
dryRunVersion := dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{
|
||||
OrganizationID: org.ID,
|
||||
CreatedBy: user.ID,
|
||||
})
|
||||
@@ -490,7 +442,7 @@ func TestDetectorHungOtherJobTypes(t *testing.T) {
|
||||
TemplateVersionID: dryRunVersion.ID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
templateDryRunJob := dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{
|
||||
templateDryRunJob := dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{
|
||||
CreatedAt: tenMinAgo,
|
||||
UpdatedAt: sixMinAgo,
|
||||
StartedAt: sql.NullTime{
|
||||
@@ -509,60 +461,33 @@ func TestDetectorHungOtherJobTypes(t *testing.T) {
|
||||
t.Log("template import job ID: ", templateImportJob.ID)
|
||||
t.Log("template dry-run job ID: ", templateDryRunJob.ID)
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
tickCh <- now
|
||||
|
||||
stats := <-statsCh
|
||||
stats := env.tick(ctx, now)
|
||||
require.NoError(t, stats.Error)
|
||||
require.Len(t, stats.TerminatedJobIDs, 2)
|
||||
require.Contains(t, stats.TerminatedJobIDs, templateImportJob.ID)
|
||||
require.Contains(t, stats.TerminatedJobIDs, templateDryRunJob.ID)
|
||||
|
||||
// Check that the template import job was updated.
|
||||
job, err := db.GetProvisionerJobByID(ctx, templateImportJob.ID)
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
|
||||
require.True(t, job.CompletedAt.Valid)
|
||||
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
|
||||
require.True(t, job.Error.Valid)
|
||||
require.Contains(t, job.Error.String, "Build has been detected as hung")
|
||||
require.False(t, job.ErrorCode.Valid)
|
||||
|
||||
// Check that the template dry-run job was updated.
|
||||
job, err = db.GetProvisionerJobByID(ctx, templateDryRunJob.ID)
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
|
||||
require.True(t, job.CompletedAt.Valid)
|
||||
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
|
||||
require.True(t, job.Error.Valid)
|
||||
require.Contains(t, job.Error.String, "Build has been detected as hung")
|
||||
require.False(t, job.ErrorCode.Valid)
|
||||
|
||||
detector.Close()
|
||||
detector.Wait()
|
||||
// Check that both jobs were terminated as hung.
|
||||
requireTerminatedJob(ctx, t, env.DB, templateImportJob.ID, now, jobreaper.Hung)
|
||||
requireTerminatedJob(ctx, t, env.DB, templateDryRunJob.ID, now, jobreaper.Hung)
|
||||
}
|
||||
|
||||
func TestDetectorPendingOtherJobTypes(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
db, pubsub = dbtestutil.NewDB(t)
|
||||
log = testutil.Logger(t)
|
||||
tickCh = make(chan time.Time)
|
||||
statsCh = make(chan jobreaper.Stats)
|
||||
)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
env := newDetectorTestEnv(ctx, t)
|
||||
defer env.close()
|
||||
|
||||
var (
|
||||
now = time.Now()
|
||||
thirtyFiveMinAgo = now.Add(-time.Minute * 35)
|
||||
org = dbgen.Organization(t, db, database.Organization{})
|
||||
user = dbgen.User(t, db, database.User{})
|
||||
file = dbgen.File(t, db, database.File{})
|
||||
org = dbgen.Organization(t, env.DB, database.Organization{})
|
||||
user = dbgen.User(t, env.DB, database.User{})
|
||||
file = dbgen.File(t, env.DB, database.File{})
|
||||
|
||||
// Template import job.
|
||||
templateImportJob = dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{
|
||||
templateImportJob = dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{
|
||||
CreatedAt: thirtyFiveMinAgo,
|
||||
UpdatedAt: thirtyFiveMinAgo,
|
||||
StartedAt: sql.NullTime{
|
||||
@@ -577,7 +502,7 @@ func TestDetectorPendingOtherJobTypes(t *testing.T) {
|
||||
Type: database.ProvisionerJobTypeTemplateVersionImport,
|
||||
Input: []byte("{}"),
|
||||
})
|
||||
_ = dbgen.TemplateVersion(t, db, database.TemplateVersion{
|
||||
_ = dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{
|
||||
OrganizationID: org.ID,
|
||||
JobID: templateImportJob.ID,
|
||||
CreatedBy: user.ID,
|
||||
@@ -585,7 +510,7 @@ func TestDetectorPendingOtherJobTypes(t *testing.T) {
|
||||
)
|
||||
|
||||
// Template dry-run job.
|
||||
dryRunVersion := dbgen.TemplateVersion(t, db, database.TemplateVersion{
|
||||
dryRunVersion := dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{
|
||||
OrganizationID: org.ID,
|
||||
CreatedBy: user.ID,
|
||||
})
|
||||
@@ -593,7 +518,7 @@ func TestDetectorPendingOtherJobTypes(t *testing.T) {
|
||||
TemplateVersionID: dryRunVersion.ID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
templateDryRunJob := dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{
|
||||
templateDryRunJob := dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{
|
||||
CreatedAt: thirtyFiveMinAgo,
|
||||
UpdatedAt: thirtyFiveMinAgo,
|
||||
StartedAt: sql.NullTime{
|
||||
@@ -612,65 +537,34 @@ func TestDetectorPendingOtherJobTypes(t *testing.T) {
|
||||
t.Log("template import job ID: ", templateImportJob.ID)
|
||||
t.Log("template dry-run job ID: ", templateDryRunJob.ID)
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
tickCh <- now
|
||||
|
||||
stats := <-statsCh
|
||||
stats := env.tick(ctx, now)
|
||||
require.NoError(t, stats.Error)
|
||||
require.Len(t, stats.TerminatedJobIDs, 2)
|
||||
require.Contains(t, stats.TerminatedJobIDs, templateImportJob.ID)
|
||||
require.Contains(t, stats.TerminatedJobIDs, templateDryRunJob.ID)
|
||||
|
||||
// Check that the template import job was updated.
|
||||
job, err := db.GetProvisionerJobByID(ctx, templateImportJob.ID)
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
|
||||
require.True(t, job.CompletedAt.Valid)
|
||||
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
|
||||
require.True(t, job.StartedAt.Valid)
|
||||
require.WithinDuration(t, now, job.StartedAt.Time, 30*time.Second)
|
||||
require.True(t, job.Error.Valid)
|
||||
require.Contains(t, job.Error.String, "Build has been detected as pending")
|
||||
require.False(t, job.ErrorCode.Valid)
|
||||
|
||||
// Check that the template dry-run job was updated.
|
||||
job, err = db.GetProvisionerJobByID(ctx, templateDryRunJob.ID)
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
|
||||
require.True(t, job.CompletedAt.Valid)
|
||||
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
|
||||
require.True(t, job.StartedAt.Valid)
|
||||
require.WithinDuration(t, now, job.StartedAt.Time, 30*time.Second)
|
||||
require.True(t, job.Error.Valid)
|
||||
require.Contains(t, job.Error.String, "Build has been detected as pending")
|
||||
require.False(t, job.ErrorCode.Valid)
|
||||
|
||||
detector.Close()
|
||||
detector.Wait()
|
||||
// Check that both jobs were terminated as pending.
|
||||
requireTerminatedJob(ctx, t, env.DB, templateImportJob.ID, now, jobreaper.Pending)
|
||||
requireTerminatedJob(ctx, t, env.DB, templateDryRunJob.ID, now, jobreaper.Pending)
|
||||
}
|
||||
|
||||
func TestDetectorHungCanceledJob(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
db, pubsub = dbtestutil.NewDB(t)
|
||||
log = testutil.Logger(t)
|
||||
tickCh = make(chan time.Time)
|
||||
statsCh = make(chan jobreaper.Stats)
|
||||
)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
env := newDetectorTestEnv(ctx, t)
|
||||
defer env.close()
|
||||
|
||||
var (
|
||||
now = time.Now()
|
||||
tenMinAgo = now.Add(-time.Minute * 10)
|
||||
sixMinAgo = now.Add(-time.Minute * 6)
|
||||
org = dbgen.Organization(t, db, database.Organization{})
|
||||
user = dbgen.User(t, db, database.User{})
|
||||
file = dbgen.File(t, db, database.File{})
|
||||
org = dbgen.Organization(t, env.DB, database.Organization{})
|
||||
user = dbgen.User(t, env.DB, database.User{})
|
||||
file = dbgen.File(t, env.DB, database.File{})
|
||||
|
||||
// Template import job.
|
||||
templateImportJob = dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{
|
||||
templateImportJob = dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{
|
||||
CreatedAt: tenMinAgo,
|
||||
CanceledAt: sql.NullTime{
|
||||
Time: tenMinAgo,
|
||||
@@ -689,7 +583,7 @@ func TestDetectorHungCanceledJob(t *testing.T) {
|
||||
Type: database.ProvisionerJobTypeTemplateVersionImport,
|
||||
Input: []byte("{}"),
|
||||
})
|
||||
_ = dbgen.TemplateVersion(t, db, database.TemplateVersion{
|
||||
_ = dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{
|
||||
OrganizationID: org.ID,
|
||||
JobID: templateImportJob.ID,
|
||||
CreatedBy: user.ID,
|
||||
@@ -698,27 +592,13 @@ func TestDetectorHungCanceledJob(t *testing.T) {
|
||||
|
||||
t.Log("template import job ID: ", templateImportJob.ID)
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
tickCh <- now
|
||||
|
||||
stats := <-statsCh
|
||||
stats := env.tick(ctx, now)
|
||||
require.NoError(t, stats.Error)
|
||||
require.Len(t, stats.TerminatedJobIDs, 1)
|
||||
require.Contains(t, stats.TerminatedJobIDs, templateImportJob.ID)
|
||||
|
||||
// Check that the job was updated.
|
||||
job, err := db.GetProvisionerJobByID(ctx, templateImportJob.ID)
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
|
||||
require.True(t, job.CompletedAt.Valid)
|
||||
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
|
||||
require.True(t, job.Error.Valid)
|
||||
require.Contains(t, job.Error.String, "Build has been detected as hung")
|
||||
require.False(t, job.ErrorCode.Valid)
|
||||
|
||||
detector.Close()
|
||||
detector.Wait()
|
||||
requireTerminatedJob(ctx, t, env.DB, templateImportJob.ID, now, jobreaper.Hung)
|
||||
}
|
||||
|
||||
func TestDetectorPushesLogs(t *testing.T) {
|
||||
@@ -753,24 +633,20 @@ func TestDetectorPushesLogs(t *testing.T) {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
db, pubsub = dbtestutil.NewDB(t)
|
||||
log = testutil.Logger(t)
|
||||
tickCh = make(chan time.Time)
|
||||
statsCh = make(chan jobreaper.Stats)
|
||||
)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
env := newDetectorTestEnv(ctx, t)
|
||||
defer env.close()
|
||||
|
||||
var (
|
||||
now = time.Now()
|
||||
tenMinAgo = now.Add(-time.Minute * 10)
|
||||
sixMinAgo = now.Add(-time.Minute * 6)
|
||||
org = dbgen.Organization(t, db, database.Organization{})
|
||||
user = dbgen.User(t, db, database.User{})
|
||||
file = dbgen.File(t, db, database.File{})
|
||||
org = dbgen.Organization(t, env.DB, database.Organization{})
|
||||
user = dbgen.User(t, env.DB, database.User{})
|
||||
file = dbgen.File(t, env.DB, database.File{})
|
||||
|
||||
// Template import job.
|
||||
templateImportJob = dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{
|
||||
templateImportJob = dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{
|
||||
CreatedAt: tenMinAgo,
|
||||
UpdatedAt: sixMinAgo,
|
||||
StartedAt: sql.NullTime{
|
||||
@@ -785,7 +661,7 @@ func TestDetectorPushesLogs(t *testing.T) {
|
||||
Type: database.ProvisionerJobTypeTemplateVersionImport,
|
||||
Input: []byte("{}"),
|
||||
})
|
||||
_ = dbgen.TemplateVersion(t, db, database.TemplateVersion{
|
||||
_ = dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{
|
||||
OrganizationID: org.ID,
|
||||
JobID: templateImportJob.ID,
|
||||
CreatedBy: user.ID,
|
||||
@@ -806,17 +682,14 @@ func TestDetectorPushesLogs(t *testing.T) {
|
||||
insertParams.Source = append(insertParams.Source, database.LogSourceProvisioner)
|
||||
insertParams.Output = append(insertParams.Output, fmt.Sprintf("Output %d", i))
|
||||
}
|
||||
logs, err := db.InsertProvisionerJobLogs(ctx, insertParams)
|
||||
logs, err := env.DB.InsertProvisionerJobLogs(ctx, insertParams)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, logs, 10)
|
||||
}
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
|
||||
// Create pubsub subscription to listen for new log events.
|
||||
pubsubCalled := make(chan int64, 1)
|
||||
pubsubCancel, err := pubsub.Subscribe(provisionersdk.ProvisionerJobLogsNotifyChannel(templateImportJob.ID), func(ctx context.Context, message []byte) {
|
||||
pubsubCancel, err := env.Pubsub.Subscribe(provisionersdk.ProvisionerJobLogsNotifyChannel(templateImportJob.ID), func(ctx context.Context, message []byte) {
|
||||
defer close(pubsubCalled)
|
||||
var event provisionersdk.ProvisionerJobLogsNotifyMessage
|
||||
err := json.Unmarshal(message, &event)
|
||||
@@ -830,9 +703,7 @@ func TestDetectorPushesLogs(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer pubsubCancel()
|
||||
|
||||
tickCh <- now
|
||||
|
||||
stats := <-statsCh
|
||||
stats := env.tick(ctx, now)
|
||||
require.NoError(t, stats.Error)
|
||||
require.Len(t, stats.TerminatedJobIDs, 1)
|
||||
require.Contains(t, stats.TerminatedJobIDs, templateImportJob.ID)
|
||||
@@ -841,7 +712,7 @@ func TestDetectorPushesLogs(t *testing.T) {
|
||||
|
||||
// Get the jobs after the given time and check that they are what we
|
||||
// expect.
|
||||
logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
|
||||
logs, err := env.DB.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
|
||||
JobID: templateImportJob.ID,
|
||||
CreatedAfter: after,
|
||||
})
|
||||
@@ -862,15 +733,12 @@ func TestDetectorPushesLogs(t *testing.T) {
|
||||
}
|
||||
|
||||
// Double check the full log count.
|
||||
logs, err = db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
|
||||
logs, err = env.DB.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
|
||||
JobID: templateImportJob.ID,
|
||||
CreatedAfter: 0,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, logs, c.preLogCount+len(expectedLogs))
|
||||
|
||||
detector.Close()
|
||||
detector.Wait()
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -878,21 +746,18 @@ func TestDetectorPushesLogs(t *testing.T) {
|
||||
func TestDetectorMaxJobsPerRun(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
db, pubsub = dbtestutil.NewDB(t)
|
||||
log = testutil.Logger(t)
|
||||
tickCh = make(chan time.Time)
|
||||
statsCh = make(chan jobreaper.Stats)
|
||||
org = dbgen.Organization(t, db, database.Organization{})
|
||||
user = dbgen.User(t, db, database.User{})
|
||||
file = dbgen.File(t, db, database.File{})
|
||||
)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
env := newDetectorTestEnv(ctx, t)
|
||||
defer env.close()
|
||||
|
||||
org := dbgen.Organization(t, env.DB, database.Organization{})
|
||||
user := dbgen.User(t, env.DB, database.User{})
|
||||
file := dbgen.File(t, env.DB, database.File{})
|
||||
|
||||
// Create MaxJobsPerRun + 1 hung jobs.
|
||||
now := time.Now()
|
||||
for i := 0; i < jobreaper.MaxJobsPerRun+1; i++ {
|
||||
pj := dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{
|
||||
pj := dbgen.ProvisionerJob(t, env.DB, env.Pubsub, database.ProvisionerJob{
|
||||
CreatedAt: now.Add(-time.Hour),
|
||||
UpdatedAt: now.Add(-time.Hour),
|
||||
StartedAt: sql.NullTime{
|
||||
@@ -907,31 +772,23 @@ func TestDetectorMaxJobsPerRun(t *testing.T) {
|
||||
Type: database.ProvisionerJobTypeTemplateVersionImport,
|
||||
Input: []byte("{}"),
|
||||
})
|
||||
_ = dbgen.TemplateVersion(t, db, database.TemplateVersion{
|
||||
_ = dbgen.TemplateVersion(t, env.DB, database.TemplateVersion{
|
||||
OrganizationID: org.ID,
|
||||
JobID: pj.ID,
|
||||
CreatedBy: user.ID,
|
||||
})
|
||||
}
|
||||
|
||||
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
|
||||
detector.Start()
|
||||
tickCh <- now
|
||||
|
||||
// Make sure that only MaxJobsPerRun jobs are terminated.
|
||||
stats := <-statsCh
|
||||
stats := env.tick(ctx, now)
|
||||
require.NoError(t, stats.Error)
|
||||
require.Len(t, stats.TerminatedJobIDs, jobreaper.MaxJobsPerRun)
|
||||
|
||||
// Run the detector again and make sure that only the remaining job is
|
||||
// terminated.
|
||||
tickCh <- now
|
||||
stats = <-statsCh
|
||||
stats = env.tick(ctx, now)
|
||||
require.NoError(t, stats.Error)
|
||||
require.Len(t, stats.TerminatedJobIDs, 1)
|
||||
|
||||
detector.Close()
|
||||
detector.Wait()
|
||||
}
|
||||
|
||||
// wrapDBAuthz adds our Authorization/RBAC around the given database store, to
|
||||
|
||||
Reference in New Issue
Block a user