mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
test(coderd/x/nats): exercise cluster auth in OK test and tighten mismatched-token assertion
This commit is contained in:
@@ -93,9 +93,15 @@ func TestPubsub_SetPeerAddresses(t *testing.T) {
|
||||
t.Parallel()
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
<<<<<<< HEAD
|
||||
a := newTestPubsub(t, clusterTestOptions(t))
|
||||
b := newTestPubsub(t, clusterTestOptions(t))
|
||||
c := newTestPubsub(t, clusterTestOptions(t))
|
||||
=======
|
||||
a := newTestPubsub(t, newClusterOptions(t))
|
||||
b := newTestPubsub(t, newClusterOptions(t))
|
||||
c := newTestPubsub(t, newClusterOptions(t))
|
||||
>>>>>>> 2fb1c9601f (test(coderd/x/nats): exercise cluster auth in OK test and tighten mismatched-token assertion)
|
||||
|
||||
addrB := clusterRouteAddress(t, b)
|
||||
addrC := clusterRouteAddress(t, c)
|
||||
@@ -119,7 +125,11 @@ func TestPubsub_SetPeerAddresses(t *testing.T) {
|
||||
|
||||
t.Run("Closed", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
<<<<<<< HEAD
|
||||
ps := newTestPubsub(t, clusterTestOptions(t))
|
||||
=======
|
||||
ps := newTestPubsub(t, newClusterOptions(t))
|
||||
>>>>>>> 2fb1c9601f (test(coderd/x/nats): exercise cluster auth in OK test and tighten mismatched-token assertion)
|
||||
require.NoError(t, ps.Close())
|
||||
err := ps.SetPeerAddresses(nil)
|
||||
require.True(t, errors.Is(err, errClosed), "got %v", err)
|
||||
@@ -127,7 +137,11 @@ func TestPubsub_SetPeerAddresses(t *testing.T) {
|
||||
|
||||
t.Run("DropsSelfRoute", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
<<<<<<< HEAD
|
||||
ps := newTestPubsub(t, clusterTestOptions(t))
|
||||
=======
|
||||
ps := newTestPubsub(t, newClusterOptions(t))
|
||||
>>>>>>> 2fb1c9601f (test(coderd/x/nats): exercise cluster auth in OK test and tighten mismatched-token assertion)
|
||||
require.NoError(t, ps.SetPeerAddresses([]string{clusterRouteAddress(t, ps)}))
|
||||
require.Empty(t, ps.currentRoutes)
|
||||
})
|
||||
|
||||
@@ -407,48 +407,17 @@ func TestPubsubCluster(t *testing.T) {
|
||||
require.Equal(t, "c-messages-still-work", string(receiveMessage(t, cUnique)))
|
||||
})
|
||||
|
||||
t.Run("ClusterAuthToken", func(t *testing.T) {
|
||||
// MismatchedTokenRejectsRoute asserts that two clustered pubsubs with
|
||||
// different ClusterAuthTokens never successfully establish a route.
|
||||
t.Run("MismatchedTokenRejectsRoute", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
clusterPort := tcpPort(t)
|
||||
a := newTestPubsub(t, newClusterTestOptionsWithHostAndToken("127.0.0.1", clusterPort, "shared"))
|
||||
b := newTestPubsub(t, newClusterTestOptionsWithHostAndToken("127.0.0.2", clusterPort, "shared"))
|
||||
require.NoError(t, a.SetPeerAddresses([]string{clusterRouteAddress(t, b)}))
|
||||
waitForRoutes(t, a, 1)
|
||||
waitForRoutes(t, b, 1)
|
||||
|
||||
event := uniqueSubject("auth-shared")
|
||||
got := make(chan []byte, 1)
|
||||
cancel, err := b.Subscribe(event, func(_ context.Context, msg []byte) {
|
||||
got <- msg
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer cancel()
|
||||
|
||||
publishAndFlush(t, a, event, "with-shared-token")
|
||||
require.Equal(t, "with-shared-token", string(receiveMessage(t, got)))
|
||||
|
||||
mismatchPort := tcpPort(t)
|
||||
c := newTestPubsub(t, newClusterTestOptionsWithHostAndToken("127.0.0.3", mismatchPort, "left"))
|
||||
d := newTestPubsub(t, newClusterTestOptionsWithHostAndToken("127.0.0.4", mismatchPort, "right"))
|
||||
c := newTestPubsub(t, newAuthClusterOptions(t, "left-token"))
|
||||
d := newTestPubsub(t, newAuthClusterOptions(t, "right-token"))
|
||||
require.NoError(t, c.SetPeerAddresses([]string{clusterRouteAddress(t, d)}))
|
||||
|
||||
event = uniqueSubject("auth-mismatch")
|
||||
got = make(chan []byte, 1)
|
||||
cancel, err = d.Subscribe(event, func(_ context.Context, msg []byte) {
|
||||
got <- msg
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer cancel()
|
||||
|
||||
publishAndFlush(t, c, event, "with-mismatched-token")
|
||||
require.Never(t, func() bool {
|
||||
select {
|
||||
case <-got:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
return c.ns.NumRoutes() > 0 || d.ns.NumRoutes() > 0
|
||||
}, testutil.IntervalMedium, testutil.IntervalFast)
|
||||
})
|
||||
|
||||
@@ -1022,23 +991,6 @@ func TestPubsubCluster(t *testing.T) {
|
||||
)
|
||||
require.ErrorContains(t, err, "not started with clustering enabled")
|
||||
})
|
||||
|
||||
t.Run("SetPeerAddressesClosed", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ps := newTestPubsub(t, newClusterTestOptions(t))
|
||||
require.NoError(t, ps.Close())
|
||||
err := ps.SetPeerAddresses(nil)
|
||||
require.True(t, errors.Is(err, errClosed), "got %v", err)
|
||||
})
|
||||
|
||||
t.Run("SetPeerAddressesDropsSelfRoute", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ps := newTestPubsub(t, newClusterTestOptions(t))
|
||||
require.NoError(t, ps.SetPeerAddresses([]string{clusterRouteAddress(t, ps)}))
|
||||
require.Empty(t, ps.currentRoutes)
|
||||
})
|
||||
}
|
||||
|
||||
func defaultTestOptions() Options {
|
||||
|
||||
Reference in New Issue
Block a user