From 7c948a7ad83875a2fd7bef2cfdd2ed1e03de7f3d Mon Sep 17 00:00:00 2001 From: Michael Suchacz <203725896+ibetitsmike@users.noreply.github.com> Date: Fri, 23 Jan 2026 08:20:13 +0100 Subject: [PATCH] test: make backedpipe ForceReconnect tests deterministic (#21635) --- .../immortalstreams/backedpipe/backed_pipe.go | 21 +++++++++++++++++-- .../backedpipe/backed_pipe_test.go | 16 +++++++------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/agent/immortalstreams/backedpipe/backed_pipe.go b/agent/immortalstreams/backedpipe/backed_pipe.go index 4b7a9f0300..35d1863e96 100644 --- a/agent/immortalstreams/backedpipe/backed_pipe.go +++ b/agent/immortalstreams/backedpipe/backed_pipe.go @@ -81,6 +81,10 @@ type BackedPipe struct { // Unified error handling with generation filtering errChan chan ErrorEvent + // forceReconnectHook is a test hook invoked after ForceReconnect registers + // with the singleflight group. + forceReconnectHook func() + // singleflight group to dedupe concurrent ForceReconnect calls sf singleflight.Group @@ -324,6 +328,13 @@ func (bp *BackedPipe) handleConnectionError(errorEvt ErrorEvent) { } } +// SetForceReconnectHookForTests sets a hook invoked after ForceReconnect +// registers with the singleflight group. It must be set before any +// concurrent ForceReconnect calls. +func (bp *BackedPipe) SetForceReconnectHookForTests(hook func()) { + bp.forceReconnectHook = hook +} + // ForceReconnect forces a reconnection attempt immediately. // This can be used to force a reconnection if a new connection is established. // It prevents duplicate reconnections when called concurrently. @@ -331,7 +342,7 @@ func (bp *BackedPipe) ForceReconnect() error { // Deduplicate concurrent ForceReconnect calls so only one reconnection // attempt runs at a time from this API. Use the pipe's internal context // to ensure Close() cancels any in-flight attempt. - _, err, _ := bp.sf.Do("force-reconnect", func() (interface{}, error) { + resultChan := bp.sf.DoChan("force-reconnect", func() (interface{}, error) { bp.mu.Lock() defer bp.mu.Unlock() @@ -346,5 +357,11 @@ func (bp *BackedPipe) ForceReconnect() error { return nil, bp.reconnectLocked() }) - return err + + if hook := bp.forceReconnectHook; hook != nil { + hook() + } + + result := <-resultChan + return result.Err } diff --git a/agent/immortalstreams/backedpipe/backed_pipe_test.go b/agent/immortalstreams/backedpipe/backed_pipe_test.go index 57d5a4724d..5e81cf7c4e 100644 --- a/agent/immortalstreams/backedpipe/backed_pipe_test.go +++ b/agent/immortalstreams/backedpipe/backed_pipe_test.go @@ -742,12 +742,15 @@ func TestBackedPipe_DuplicateReconnectionPrevention(t *testing.T) { const numConcurrent = 3 startSignals := make([]chan struct{}, numConcurrent) - startedSignals := make([]chan struct{}, numConcurrent) for i := range startSignals { startSignals[i] = make(chan struct{}) - startedSignals[i] = make(chan struct{}) } + enteredSignals := make(chan struct{}, numConcurrent) + bp.SetForceReconnectHookForTests(func() { + enteredSignals <- struct{}{} + }) + errors := make([]error, numConcurrent) var wg sync.WaitGroup @@ -758,15 +761,12 @@ func TestBackedPipe_DuplicateReconnectionPrevention(t *testing.T) { defer wg.Done() // Wait for the signal to start <-startSignals[idx] - // Signal that we're about to call ForceReconnect - close(startedSignals[idx]) errors[idx] = bp.ForceReconnect() }(i) } // Start the first ForceReconnect and wait for it to block close(startSignals[0]) - <-startedSignals[0] // Wait for the first reconnect to actually start and block testutil.RequireReceive(testCtx, t, blockedChan) @@ -777,9 +777,9 @@ func TestBackedPipe_DuplicateReconnectionPrevention(t *testing.T) { close(startSignals[i]) } - // Wait for all additional goroutines to have started their calls - for i := 1; i < numConcurrent; i++ { - <-startedSignals[i] + // Wait for all ForceReconnect calls to join the singleflight operation. + for i := 0; i < numConcurrent; i++ { + testutil.RequireReceive(testCtx, t, enteredSignals) } // At this point, one reconnect has started and is blocked,