Files
coder/coderd/x/nats/pubsub_test.go
T
2026-05-27 12:57:20 -05:00

200 lines
4.7 KiB
Go

package nats_test
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"cdr.dev/slog/v3"
"cdr.dev/slog/v3/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/database/pubsub"
xnats "github.com/coder/coder/v2/coderd/x/nats"
"github.com/coder/coder/v2/testutil"
)
func newTestPubsub(t *testing.T, opts xnats.Options) *xnats.Pubsub {
t.Helper()
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
ctx, cancel := context.WithCancel(context.Background())
ps, err := xnats.New(ctx, logger, opts)
require.NoError(t, err)
t.Cleanup(func() {
_ = ps.Close()
cancel()
})
return ps
}
func TestPubsub(t *testing.T) {
t.Parallel()
t.Run("RoundTrip", func(t *testing.T) {
t.Parallel()
ps := newTestPubsub(t, xnats.Options{})
got := make(chan []byte, 1)
cancel, err := ps.Subscribe("test_event", func(_ context.Context, msg []byte) {
got <- msg
})
require.NoError(t, err)
defer cancel()
require.NoError(t, ps.Publish("test_event", []byte("hello")))
select {
case msg := <-got:
assert.Equal(t, "hello", string(msg))
case <-time.After(testutil.WaitShort):
t.Fatal("timed out waiting for message")
}
})
t.Run("SubscribeWithErrNormalMessage", func(t *testing.T) {
t.Parallel()
ps := newTestPubsub(t, xnats.Options{})
got := make(chan []byte, 1)
cancel, err := ps.SubscribeWithErr("evt", func(_ context.Context, msg []byte, err error) {
assert.NoError(t, err)
got <- msg
})
require.NoError(t, err)
defer cancel()
require.NoError(t, ps.Publish("evt", []byte("payload")))
select {
case msg := <-got:
assert.Equal(t, "payload", string(msg))
case <-time.After(testutil.WaitShort):
t.Fatal("timed out waiting for message")
}
})
t.Run("EchoDefault", func(t *testing.T) {
t.Parallel()
ps := newTestPubsub(t, xnats.Options{})
got := make(chan []byte, 1)
cancel, err := ps.Subscribe("echo_evt", func(_ context.Context, msg []byte) {
got <- msg
})
require.NoError(t, err)
defer cancel()
require.NoError(t, ps.Publish("echo_evt", []byte("data")))
select {
case msg := <-got:
assert.Equal(t, "data", string(msg))
case <-time.After(testutil.WaitShort):
t.Fatal("default should echo own messages")
}
})
t.Run("Ordering", func(t *testing.T) {
t.Parallel()
ps := newTestPubsub(t, xnats.Options{})
const n = 100
got := make(chan []byte, n)
cancel, err := ps.Subscribe("ord_evt", func(_ context.Context, msg []byte) {
got <- msg
})
require.NoError(t, err)
defer cancel()
for i := 0; i < n; i++ {
require.NoError(t, ps.Publish("ord_evt", []byte(fmt.Sprintf("%d", i))))
}
deadline := time.After(testutil.WaitLong)
for i := 0; i < n; i++ {
select {
case msg := <-got:
assert.Equal(t, fmt.Sprintf("%d", i), string(msg))
case <-deadline:
t.Fatalf("timed out at message %d/%d", i, n)
}
}
})
t.Run("CloseIdempotent", func(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
ps, err := xnats.New(ctx, logger, xnats.Options{})
require.NoError(t, err)
var first, second error
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
first = ps.Close()
}()
wg.Wait()
second = ps.Close()
assert.NoError(t, first)
assert.NoError(t, second)
})
t.Run("SubscribeWithErrReceivesDropError", func(t *testing.T) {
t.Parallel()
ps := newTestPubsub(t, xnats.Options{
PendingLimits: xnats.PendingLimits{Msgs: 1, Bytes: 1024 * 1024},
})
const event = "slow_evt_sync"
started := make(chan struct{})
release := make(chan struct{})
dropped := make(chan error, 1)
var startedOnce sync.Once
var releaseOnce sync.Once
defer releaseOnce.Do(func() { close(release) })
cancel, err := ps.SubscribeWithErr(event, func(_ context.Context, _ []byte, err error) {
if err != nil {
select {
case dropped <- err:
default:
}
return
}
startedOnce.Do(func() {
close(started)
<-release
})
})
require.NoError(t, err)
defer cancel()
require.NoError(t, ps.Publish(event, []byte("first")))
require.NoError(t, ps.Flush())
select {
case <-started:
case <-time.After(testutil.WaitShort):
t.Fatal("timed out waiting for first callback")
}
for i := 0; i < 8; i++ {
require.NoError(t, ps.Publish(event, []byte("burst")))
}
require.NoError(t, ps.Flush())
releaseOnce.Do(func() { close(release) })
select {
case err := <-dropped:
assert.ErrorIs(t, err, pubsub.ErrDroppedMessages)
case <-time.After(testutil.WaitLong):
t.Fatal("timed out waiting for drop error")
}
})
}