test: add testutil.WaitBuffer and replace time.Sleep in tests (#22922)

WaitBuffer is a thread-safe io.Writer that supports blocking until
accumulated output matches a substring or custom predicate. It
replaces ad-hoc safeBuffer/syncWriter types and time.Sleep-based
poll loops in tests with signal-driven waits.

- WaitFor/WaitForNth/WaitForCond for blocking on output
- Replace custom buffer types in cli/sync_test.go and
  provisionersdk/agent_test.go
- Convert time.Sleep poll loops to require.Eventually/require.Never
  in cli/ssh_test.go, coderd/activitybump_test.go,
  coderd/workspaceagentsrpc_test.go, workspaceproxy_test.go, and
  scaletest tests
This commit is contained in:
Mathias Fredriksson
2026-03-12 18:07:52 +02:00
committed by GitHub
parent a6697b1b29
commit 57af7abf1f
11 changed files with 449 additions and 161 deletions
+117
View File
@@ -0,0 +1,117 @@
package testutil
import (
"bytes"
"context"
"strings"
"sync"
"testing"
)
// WaitBuffer is a thread-safe buffer (io.Writer) that supports
// blocking until the accumulated content matches a condition.
// It is intended for tests that need to wait for specific output
// from a command or process before proceeding.
//
// WaitBuffer is safe for concurrent use. Multiple goroutines may
// write to it, and WaitFor/WaitForCond may be called from any
// goroutine.
type WaitBuffer struct {
mu sync.Mutex
buf bytes.Buffer
waiters []*wbWaiter
}
type wbWaiter struct {
cond func(string) bool
ch chan struct{}
once sync.Once
}
// NewWaitBuffer returns a new WaitBuffer. It can be used as a
// plain thread-safe io.Writer even if WaitFor is never called.
func NewWaitBuffer() *WaitBuffer {
return &WaitBuffer{}
}
// Write implements io.Writer. It is safe for concurrent use.
func (wb *WaitBuffer) Write(p []byte) (int, error) {
wb.mu.Lock()
defer wb.mu.Unlock()
n, err := wb.buf.Write(p)
s := wb.buf.String()
for _, w := range wb.waiters {
if w.cond(s) {
w.once.Do(func() { close(w.ch) })
}
}
return n, err
}
// WaitFor blocks until the accumulated output contains signal or
// ctx expires. Returns nil on match, ctx.Err() on timeout.
// Safe to call from any goroutine.
func (wb *WaitBuffer) WaitFor(ctx context.Context, signal string) error {
return wb.WaitForNth(ctx, signal, 1)
}
// WaitForNth blocks until the accumulated output contains at least
// n occurrences of signal, or ctx expires. Returns nil on match,
// ctx.Err() on timeout. Safe to call from any goroutine.
func (wb *WaitBuffer) WaitForNth(ctx context.Context, signal string, n int) error {
return wb.WaitForCond(ctx, func(s string) bool {
return strings.Count(s, signal) >= n
})
}
// WaitForCond blocks until cond returns true for the accumulated
// output, or ctx expires. Returns nil on match, ctx.Err() on
// timeout. Safe to call from any goroutine.
func (wb *WaitBuffer) WaitForCond(ctx context.Context, cond func(string) bool) error {
wb.mu.Lock()
if cond(wb.buf.String()) {
wb.mu.Unlock()
return nil
}
w := &wbWaiter{
cond: cond,
ch: make(chan struct{}),
}
wb.waiters = append(wb.waiters, w)
wb.mu.Unlock()
select {
case <-w.ch:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// RequireWaitFor blocks until the accumulated output contains
// signal or ctx expires. On timeout, fails the test with a
// message showing what was expected and what was written so far.
//
// Safety: Must only be called from the Go routine that created
// `t`.
func (wb *WaitBuffer) RequireWaitFor(ctx context.Context, t testing.TB, signal string) {
t.Helper()
if err := wb.WaitFor(ctx, signal); err != nil {
t.Fatalf("WaitBuffer: signal %q not found; buffer contents:\n%s", signal, wb.String())
}
}
// Bytes returns a copy of the accumulated output.
func (wb *WaitBuffer) Bytes() []byte {
wb.mu.Lock()
defer wb.mu.Unlock()
return bytes.Clone(wb.buf.Bytes())
}
// String returns the accumulated output as a string.
func (wb *WaitBuffer) String() string {
wb.mu.Lock()
defer wb.mu.Unlock()
return wb.buf.String()
}
+271
View File
@@ -0,0 +1,271 @@
package testutil_test
import (
"context"
"fmt"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/coder/coder/v2/testutil"
)
func TestWaitBuffer_WaitFor_Blocks(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
wb := testutil.NewWaitBuffer()
done := make(chan struct{})
go func() {
defer close(done)
_ = wb.WaitFor(ctx, "hello")
}()
// Write the signal after the goroutine is blocking.
_, err := wb.Write([]byte("hello"))
require.NoError(t, err)
select {
case <-done:
case <-ctx.Done():
t.Fatal("WaitFor did not unblock after signal was written")
}
}
func TestWaitBuffer_WaitFor_AlreadyPresent(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
wb := testutil.NewWaitBuffer()
_, err := wb.Write([]byte("already here"))
require.NoError(t, err)
// Signal is already in the buffer; WaitFor returns immediately.
require.NoError(t, wb.WaitFor(ctx, "already"))
}
func TestWaitBuffer_WaitFor_ContextExpired(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
cancel() // Already expired.
wb := testutil.NewWaitBuffer()
err := wb.WaitFor(ctx, "never")
require.ErrorIs(t, err, context.Canceled)
}
func TestWaitBuffer_WaitFor_MultipleWrites(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
wb := testutil.NewWaitBuffer()
// Write partial content that doesn't satisfy the condition.
_, err := wb.Write([]byte("hell"))
require.NoError(t, err)
done := make(chan struct{})
go func() {
defer close(done)
_ = wb.WaitFor(ctx, "hello")
}()
// Complete the signal with a second write.
_, err = wb.Write([]byte("o"))
require.NoError(t, err)
select {
case <-done:
case <-ctx.Done():
t.Fatal("WaitFor did not unblock after multiple writes completed the signal")
}
}
func TestWaitBuffer_WaitForCond(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
wb := testutil.NewWaitBuffer()
done := make(chan struct{})
go func() {
defer close(done)
// Wait until the buffer has at least 10 bytes.
_ = wb.WaitForCond(ctx, func(s string) bool {
return len(s) >= 10
})
}()
_, err := wb.Write([]byte("12345"))
require.NoError(t, err)
_, err = wb.Write([]byte("67890"))
require.NoError(t, err)
select {
case <-done:
case <-ctx.Done():
t.Fatal("WaitForCond did not unblock when condition was met")
}
}
func TestWaitBuffer_ConcurrentWrites(t *testing.T) {
t.Parallel()
wb := testutil.NewWaitBuffer()
var wg sync.WaitGroup
const writers = 10
const iterations = 100
wg.Add(writers)
for i := range writers {
go func() {
defer wg.Done()
for j := range iterations {
_, _ = wb.Write([]byte(fmt.Sprintf("w%d-%d ", i, j)))
}
}()
}
wg.Wait()
// Every write should have landed; verify no data was lost by
// checking the length is at least as large as expected.
assert.GreaterOrEqual(t, len(wb.Bytes()), writers*iterations)
}
func TestWaitBuffer_WaitFor_BackgroundGoroutine(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
cancel() // Expire immediately.
wb := testutil.NewWaitBuffer()
// WaitFor from a background goroutine should return the
// context error rather than calling t.Fatal.
done := make(chan error, 1)
go func() {
done <- wb.WaitFor(ctx, "never")
}()
err := <-done
require.ErrorIs(t, err, context.Canceled)
}
func TestWaitBuffer_SequentialWaits(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
wb := testutil.NewWaitBuffer()
_, err := wb.Write([]byte("first "))
require.NoError(t, err)
require.NoError(t, wb.WaitFor(ctx, "first"))
_, err = wb.Write([]byte("second"))
require.NoError(t, err)
require.NoError(t, wb.WaitFor(ctx, "second"))
}
func TestWaitBuffer_WaitForNth_Blocks(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
wb := testutil.NewWaitBuffer()
_, err := wb.Write([]byte("Foo "))
require.NoError(t, err)
// First occurrence is already present, but we want two.
done := make(chan struct{})
go func() {
defer close(done)
_ = wb.WaitForNth(ctx, "Foo", 2)
}()
_, err = wb.Write([]byte("Bar Foo"))
require.NoError(t, err)
select {
case <-done:
case <-ctx.Done():
t.Fatal("WaitForNth did not unblock after second occurrence")
}
}
func TestWaitBuffer_WaitForNth_AlreadySatisfied(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
wb := testutil.NewWaitBuffer()
_, err := wb.Write([]byte("Foo Foo Foo"))
require.NoError(t, err)
// All three occurrences already present.
require.NoError(t, wb.WaitForNth(ctx, "Foo", 3))
}
func TestWaitBuffer_RequireWaitFor_Timeout(t *testing.T) {
t.Parallel()
// Use a mock testing.TB to capture the fatal call without
// killing the real test.
mock := &tbMock{}
ctx, cancel := context.WithCancel(context.Background())
cancel()
wb := testutil.NewWaitBuffer()
_, err := wb.Write([]byte("some output"))
require.NoError(t, err)
wb.RequireWaitFor(ctx, mock, "missing-signal")
assert.True(t, mock.failed(), "expected RequireWaitFor to fail the mock test")
}
// tbMock is a minimal testing.TB that records Fatalf calls.
type tbMock struct {
testing.TB // Embed to satisfy the interface.
mu sync.Mutex
fatalCalls int
}
func (*tbMock) Helper() {}
func (m *tbMock) Fatalf(string, ...any) {
m.mu.Lock()
defer m.mu.Unlock()
m.fatalCalls++
}
func (m *tbMock) failed() bool {
m.mu.Lock()
defer m.mu.Unlock()
return m.fatalCalls > 0
}
func TestWaitBuffer_Bytes_ReturnsCopy(t *testing.T) {
t.Parallel()
wb := testutil.NewWaitBuffer()
_, err := wb.Write([]byte("original"))
require.NoError(t, err)
b := wb.Bytes()
// Mutate the returned slice.
for i := range b {
b[i] = 'X'
}
// The internal buffer must be unchanged.
require.Equal(t, "original", wb.String())
}
func TestWaitBuffer_PlainBuffer(t *testing.T) {
t.Parallel()
wb := testutil.NewWaitBuffer()
_, err := wb.Write([]byte("hello "))
require.NoError(t, err)
_, err = wb.Write([]byte("world"))
require.NoError(t, err)
require.Equal(t, "hello world", wb.String())
require.Equal(t, []byte("hello world"), wb.Bytes())
}