mirror of
https://github.com/coder/coder.git
synced 2026-06-03 13:08:25 +00:00
57af7abf1f
WaitBuffer is a thread-safe io.Writer that supports blocking until accumulated output matches a substring or custom predicate. It replaces ad-hoc safeBuffer/syncWriter types and time.Sleep-based poll loops in tests with signal-driven waits. - WaitFor/WaitForNth/WaitForCond for blocking on output - Replace custom buffer types in cli/sync_test.go and provisionersdk/agent_test.go - Convert time.Sleep poll loops to require.Eventually/require.Never in cli/ssh_test.go, coderd/activitybump_test.go, coderd/workspaceagentsrpc_test.go, workspaceproxy_test.go, and scaletest tests
272 lines
6.1 KiB
Go
272 lines
6.1 KiB
Go
package testutil_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/coder/coder/v2/testutil"
|
|
)
|
|
|
|
func TestWaitBuffer_WaitFor_Blocks(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer close(done)
|
|
_ = wb.WaitFor(ctx, "hello")
|
|
}()
|
|
|
|
// Write the signal after the goroutine is blocking.
|
|
_, err := wb.Write([]byte("hello"))
|
|
require.NoError(t, err)
|
|
|
|
select {
|
|
case <-done:
|
|
case <-ctx.Done():
|
|
t.Fatal("WaitFor did not unblock after signal was written")
|
|
}
|
|
}
|
|
|
|
func TestWaitBuffer_WaitFor_AlreadyPresent(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
_, err := wb.Write([]byte("already here"))
|
|
require.NoError(t, err)
|
|
|
|
// Signal is already in the buffer; WaitFor returns immediately.
|
|
require.NoError(t, wb.WaitFor(ctx, "already"))
|
|
}
|
|
|
|
func TestWaitBuffer_WaitFor_ContextExpired(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel() // Already expired.
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
err := wb.WaitFor(ctx, "never")
|
|
require.ErrorIs(t, err, context.Canceled)
|
|
}
|
|
|
|
func TestWaitBuffer_WaitFor_MultipleWrites(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
// Write partial content that doesn't satisfy the condition.
|
|
_, err := wb.Write([]byte("hell"))
|
|
require.NoError(t, err)
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer close(done)
|
|
_ = wb.WaitFor(ctx, "hello")
|
|
}()
|
|
|
|
// Complete the signal with a second write.
|
|
_, err = wb.Write([]byte("o"))
|
|
require.NoError(t, err)
|
|
|
|
select {
|
|
case <-done:
|
|
case <-ctx.Done():
|
|
t.Fatal("WaitFor did not unblock after multiple writes completed the signal")
|
|
}
|
|
}
|
|
|
|
func TestWaitBuffer_WaitForCond(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer close(done)
|
|
// Wait until the buffer has at least 10 bytes.
|
|
_ = wb.WaitForCond(ctx, func(s string) bool {
|
|
return len(s) >= 10
|
|
})
|
|
}()
|
|
|
|
_, err := wb.Write([]byte("12345"))
|
|
require.NoError(t, err)
|
|
_, err = wb.Write([]byte("67890"))
|
|
require.NoError(t, err)
|
|
|
|
select {
|
|
case <-done:
|
|
case <-ctx.Done():
|
|
t.Fatal("WaitForCond did not unblock when condition was met")
|
|
}
|
|
}
|
|
|
|
func TestWaitBuffer_ConcurrentWrites(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
var wg sync.WaitGroup
|
|
const writers = 10
|
|
const iterations = 100
|
|
wg.Add(writers)
|
|
for i := range writers {
|
|
go func() {
|
|
defer wg.Done()
|
|
for j := range iterations {
|
|
_, _ = wb.Write([]byte(fmt.Sprintf("w%d-%d ", i, j)))
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
// Every write should have landed; verify no data was lost by
|
|
// checking the length is at least as large as expected.
|
|
assert.GreaterOrEqual(t, len(wb.Bytes()), writers*iterations)
|
|
}
|
|
|
|
func TestWaitBuffer_WaitFor_BackgroundGoroutine(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel() // Expire immediately.
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
|
|
// WaitFor from a background goroutine should return the
|
|
// context error rather than calling t.Fatal.
|
|
done := make(chan error, 1)
|
|
go func() {
|
|
done <- wb.WaitFor(ctx, "never")
|
|
}()
|
|
|
|
err := <-done
|
|
require.ErrorIs(t, err, context.Canceled)
|
|
}
|
|
|
|
func TestWaitBuffer_SequentialWaits(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
|
|
_, err := wb.Write([]byte("first "))
|
|
require.NoError(t, err)
|
|
require.NoError(t, wb.WaitFor(ctx, "first"))
|
|
|
|
_, err = wb.Write([]byte("second"))
|
|
require.NoError(t, err)
|
|
require.NoError(t, wb.WaitFor(ctx, "second"))
|
|
}
|
|
|
|
func TestWaitBuffer_WaitForNth_Blocks(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
_, err := wb.Write([]byte("Foo "))
|
|
require.NoError(t, err)
|
|
|
|
// First occurrence is already present, but we want two.
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer close(done)
|
|
_ = wb.WaitForNth(ctx, "Foo", 2)
|
|
}()
|
|
|
|
_, err = wb.Write([]byte("Bar Foo"))
|
|
require.NoError(t, err)
|
|
|
|
select {
|
|
case <-done:
|
|
case <-ctx.Done():
|
|
t.Fatal("WaitForNth did not unblock after second occurrence")
|
|
}
|
|
}
|
|
|
|
func TestWaitBuffer_WaitForNth_AlreadySatisfied(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := testutil.Context(t, testutil.WaitShort)
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
_, err := wb.Write([]byte("Foo Foo Foo"))
|
|
require.NoError(t, err)
|
|
|
|
// All three occurrences already present.
|
|
require.NoError(t, wb.WaitForNth(ctx, "Foo", 3))
|
|
}
|
|
|
|
func TestWaitBuffer_RequireWaitFor_Timeout(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Use a mock testing.TB to capture the fatal call without
|
|
// killing the real test.
|
|
mock := &tbMock{}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
_, err := wb.Write([]byte("some output"))
|
|
require.NoError(t, err)
|
|
|
|
wb.RequireWaitFor(ctx, mock, "missing-signal")
|
|
assert.True(t, mock.failed(), "expected RequireWaitFor to fail the mock test")
|
|
}
|
|
|
|
// tbMock is a minimal testing.TB that records Fatalf calls.
|
|
type tbMock struct {
|
|
testing.TB // Embed to satisfy the interface.
|
|
mu sync.Mutex
|
|
fatalCalls int
|
|
}
|
|
|
|
func (*tbMock) Helper() {}
|
|
|
|
func (m *tbMock) Fatalf(string, ...any) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.fatalCalls++
|
|
}
|
|
|
|
func (m *tbMock) failed() bool {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return m.fatalCalls > 0
|
|
}
|
|
|
|
func TestWaitBuffer_Bytes_ReturnsCopy(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
_, err := wb.Write([]byte("original"))
|
|
require.NoError(t, err)
|
|
|
|
b := wb.Bytes()
|
|
// Mutate the returned slice.
|
|
for i := range b {
|
|
b[i] = 'X'
|
|
}
|
|
// The internal buffer must be unchanged.
|
|
require.Equal(t, "original", wb.String())
|
|
}
|
|
|
|
func TestWaitBuffer_PlainBuffer(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
wb := testutil.NewWaitBuffer()
|
|
_, err := wb.Write([]byte("hello "))
|
|
require.NoError(t, err)
|
|
_, err = wb.Write([]byte("world"))
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, "hello world", wb.String())
|
|
require.Equal(t, []byte("hello world"), wb.Bytes())
|
|
}
|