mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
chore: randomize task status update times in load generator (#23058)
fixes https://github.com/coder/scaletest/issues/92 Randomizes the time between task status updates so that we don't send them all at the same time for load testing.
This commit is contained in:
+20
-11
@@ -3,6 +3,7 @@ package taskstatus
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -43,7 +44,8 @@ type Runner struct {
|
||||
doneReporting bool
|
||||
|
||||
// testing only
|
||||
clock quartz.Clock
|
||||
clock quartz.Clock
|
||||
randFloat64 func() float64
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -58,6 +60,7 @@ func NewRunner(coderClient *codersdk.Client, cfg Config) *Runner {
|
||||
updater: newAppStatusUpdater(coderClient),
|
||||
cfg: cfg,
|
||||
clock: quartz.NewReal(),
|
||||
randFloat64: rand.Float64,
|
||||
reportTimes: make(map[int]time.Time),
|
||||
}
|
||||
}
|
||||
@@ -221,13 +224,25 @@ func (r *Runner) reportTaskStatus(ctx context.Context) error {
|
||||
startedReporting := r.clock.Now("reportTaskStatus", "startedReporting")
|
||||
msgNo := 0
|
||||
|
||||
done := xerrors.New("done reporting task status") // sentinel error
|
||||
waiter := r.clock.TickerFunc(ctx, r.cfg.ReportStatusPeriod, func() error {
|
||||
getRandPeriod := func() time.Duration {
|
||||
// vary the period by +-50% so that updates are not synchronized across runners, which would create
|
||||
// artificially large instantaneous stress on Coder and the database.
|
||||
p := (r.randFloat64() + 0.5) * r.cfg.ReportStatusPeriod.Seconds()
|
||||
return time.Duration(p * float64(time.Second))
|
||||
}
|
||||
tmr := r.clock.NewTimer(getRandPeriod(), "reportTaskStatus")
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-tmr.C:
|
||||
tmr.Reset(getRandPeriod(), "reportTaskStatus", "tick")
|
||||
}
|
||||
r.mu.Lock()
|
||||
now := r.clock.Now("reportTaskStatus", "tick")
|
||||
r.reportTimes[msgNo] = now
|
||||
// It's important that we set doneReporting along with a final report, since the watchWorkspaceUpdates goroutine
|
||||
// needs a update to wake up and check if we're done. We could introduce a secondary signaling channel, but
|
||||
// needs an update to wake up and check if we're done. We could introduce a secondary signaling channel, but
|
||||
// it adds a lot of complexity and will be hard to test. We expect the tick period to be much smaller than the
|
||||
// report status duration, so one extra tick is not a big deal.
|
||||
if now.After(startedReporting.Add(r.cfg.ReportStatusDuration)) {
|
||||
@@ -249,15 +264,9 @@ func (r *Runner) reportTaskStatus(ctx context.Context) error {
|
||||
// note that it's safe to read r.doneReporting here without a lock because we're the only goroutine that sets
|
||||
// it.
|
||||
if r.doneReporting {
|
||||
return done // causes the ticker to exit due to the sentinel error
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}, "reportTaskStatus")
|
||||
err := waiter.Wait()
|
||||
if xerrors.Is(err, done) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func parseStatusMessage(message string) (int, bool) {
|
||||
|
||||
@@ -196,10 +196,11 @@ func TestRunner_Run(t *testing.T) {
|
||||
updater: fUpdater,
|
||||
cfg: cfg,
|
||||
clock: mClock,
|
||||
randFloat64: func() float64 { return 0.5 }, // not random in tests
|
||||
reportTimes: make(map[int]time.Time),
|
||||
}
|
||||
|
||||
reportTickerTrap := mClock.Trap().TickerFunc("reportTaskStatus")
|
||||
reportTickerTrap := mClock.Trap().NewTimer("reportTaskStatus")
|
||||
defer reportTickerTrap.Close()
|
||||
sinceTrap := mClock.Trap().Since("watchWorkspaceUpdates")
|
||||
defer sinceTrap.Close()
|
||||
@@ -318,10 +319,11 @@ func TestRunner_RunMissedUpdate(t *testing.T) {
|
||||
updater: fUpdater,
|
||||
cfg: cfg,
|
||||
clock: mClock,
|
||||
randFloat64: func() float64 { return 0.5 }, // not random in tests
|
||||
reportTimes: make(map[int]time.Time),
|
||||
}
|
||||
|
||||
tickerTrap := mClock.Trap().TickerFunc("reportTaskStatus")
|
||||
tickerTrap := mClock.Trap().NewTimer("reportTaskStatus")
|
||||
defer tickerTrap.Close()
|
||||
sinceTrap := mClock.Trap().Since("watchWorkspaceUpdates")
|
||||
defer sinceTrap.Close()
|
||||
@@ -443,10 +445,11 @@ func TestRunner_Run_WithErrors(t *testing.T) {
|
||||
updater: fUpdater,
|
||||
cfg: cfg,
|
||||
clock: mClock,
|
||||
randFloat64: func() float64 { return 0.5 }, // not random in tests
|
||||
reportTimes: make(map[int]time.Time),
|
||||
}
|
||||
|
||||
tickerTrap := mClock.Trap().TickerFunc("reportTaskStatus")
|
||||
tickerTrap := mClock.Trap().NewTimer("reportTaskStatus")
|
||||
defer tickerTrap.Close()
|
||||
buildTickerTrap := mClock.Trap().TickerFunc("createExternalWorkspace")
|
||||
defer buildTickerTrap.Close()
|
||||
@@ -544,6 +547,7 @@ func TestRunner_Run_BuildFailed(t *testing.T) {
|
||||
updater: fUpdater,
|
||||
cfg: cfg,
|
||||
clock: mClock,
|
||||
randFloat64: func() float64 { return 0.5 }, // not random in tests
|
||||
reportTimes: make(map[int]time.Time),
|
||||
}
|
||||
|
||||
@@ -673,10 +677,11 @@ func TestRunner_Cleanup(t *testing.T) {
|
||||
}
|
||||
|
||||
runner := &Runner{
|
||||
client: fakeClient,
|
||||
updater: newFakeAppStatusUpdater(t),
|
||||
cfg: cfg,
|
||||
clock: quartz.NewMock(t),
|
||||
client: fakeClient,
|
||||
updater: newFakeAppStatusUpdater(t),
|
||||
cfg: cfg,
|
||||
clock: quartz.NewMock(t),
|
||||
randFloat64: func() float64 { return 0.5 }, // not random in tests
|
||||
}
|
||||
|
||||
logWriter := testutil.NewTestLogWriter(t)
|
||||
|
||||
Reference in New Issue
Block a user