diff --git a/coderd/x/nats/cluster_internal_test.go b/coderd/x/nats/cluster_internal_test.go index eadf2e561f..c2d960d503 100644 --- a/coderd/x/nats/cluster_internal_test.go +++ b/coderd/x/nats/cluster_internal_test.go @@ -93,9 +93,15 @@ func TestPubsub_SetPeerAddresses(t *testing.T) { t.Parallel() t.Run("OK", func(t *testing.T) { t.Parallel() +<<<<<<< HEAD a := newTestPubsub(t, clusterTestOptions(t)) b := newTestPubsub(t, clusterTestOptions(t)) c := newTestPubsub(t, clusterTestOptions(t)) +======= + a := newTestPubsub(t, newClusterOptions(t)) + b := newTestPubsub(t, newClusterOptions(t)) + c := newTestPubsub(t, newClusterOptions(t)) +>>>>>>> 2fb1c9601f (test(coderd/x/nats): exercise cluster auth in OK test and tighten mismatched-token assertion) addrB := clusterRouteAddress(t, b) addrC := clusterRouteAddress(t, c) @@ -119,7 +125,11 @@ func TestPubsub_SetPeerAddresses(t *testing.T) { t.Run("Closed", func(t *testing.T) { t.Parallel() +<<<<<<< HEAD ps := newTestPubsub(t, clusterTestOptions(t)) +======= + ps := newTestPubsub(t, newClusterOptions(t)) +>>>>>>> 2fb1c9601f (test(coderd/x/nats): exercise cluster auth in OK test and tighten mismatched-token assertion) require.NoError(t, ps.Close()) err := ps.SetPeerAddresses(nil) require.True(t, errors.Is(err, errClosed), "got %v", err) @@ -127,7 +137,11 @@ func TestPubsub_SetPeerAddresses(t *testing.T) { t.Run("DropsSelfRoute", func(t *testing.T) { t.Parallel() +<<<<<<< HEAD ps := newTestPubsub(t, clusterTestOptions(t)) +======= + ps := newTestPubsub(t, newClusterOptions(t)) +>>>>>>> 2fb1c9601f (test(coderd/x/nats): exercise cluster auth in OK test and tighten mismatched-token assertion) require.NoError(t, ps.SetPeerAddresses([]string{clusterRouteAddress(t, ps)})) require.Empty(t, ps.currentRoutes) }) diff --git a/coderd/x/nats/pubsub_internal_test.go b/coderd/x/nats/pubsub_internal_test.go index 17e3d5f61f..3838248b4f 100644 --- a/coderd/x/nats/pubsub_internal_test.go +++ b/coderd/x/nats/pubsub_internal_test.go @@ -407,48 +407,17 @@ func TestPubsubCluster(t *testing.T) { require.Equal(t, "c-messages-still-work", string(receiveMessage(t, cUnique))) }) - t.Run("ClusterAuthToken", func(t *testing.T) { + // MismatchedTokenRejectsRoute asserts that two clustered pubsubs with + // different ClusterAuthTokens never successfully establish a route. + t.Run("MismatchedTokenRejectsRoute", func(t *testing.T) { t.Parallel() - clusterPort := tcpPort(t) - a := newTestPubsub(t, newClusterTestOptionsWithHostAndToken("127.0.0.1", clusterPort, "shared")) - b := newTestPubsub(t, newClusterTestOptionsWithHostAndToken("127.0.0.2", clusterPort, "shared")) - require.NoError(t, a.SetPeerAddresses([]string{clusterRouteAddress(t, b)})) - waitForRoutes(t, a, 1) - waitForRoutes(t, b, 1) - - event := uniqueSubject("auth-shared") - got := make(chan []byte, 1) - cancel, err := b.Subscribe(event, func(_ context.Context, msg []byte) { - got <- msg - }) - require.NoError(t, err) - defer cancel() - - publishAndFlush(t, a, event, "with-shared-token") - require.Equal(t, "with-shared-token", string(receiveMessage(t, got))) - - mismatchPort := tcpPort(t) - c := newTestPubsub(t, newClusterTestOptionsWithHostAndToken("127.0.0.3", mismatchPort, "left")) - d := newTestPubsub(t, newClusterTestOptionsWithHostAndToken("127.0.0.4", mismatchPort, "right")) + c := newTestPubsub(t, newAuthClusterOptions(t, "left-token")) + d := newTestPubsub(t, newAuthClusterOptions(t, "right-token")) require.NoError(t, c.SetPeerAddresses([]string{clusterRouteAddress(t, d)})) - event = uniqueSubject("auth-mismatch") - got = make(chan []byte, 1) - cancel, err = d.Subscribe(event, func(_ context.Context, msg []byte) { - got <- msg - }) - require.NoError(t, err) - defer cancel() - - publishAndFlush(t, c, event, "with-mismatched-token") require.Never(t, func() bool { - select { - case <-got: - return true - default: - return false - } + return c.ns.NumRoutes() > 0 || d.ns.NumRoutes() > 0 }, testutil.IntervalMedium, testutil.IntervalFast) }) @@ -1022,23 +991,6 @@ func TestPubsubCluster(t *testing.T) { ) require.ErrorContains(t, err, "not started with clustering enabled") }) - - t.Run("SetPeerAddressesClosed", func(t *testing.T) { - t.Parallel() - - ps := newTestPubsub(t, newClusterTestOptions(t)) - require.NoError(t, ps.Close()) - err := ps.SetPeerAddresses(nil) - require.True(t, errors.Is(err, errClosed), "got %v", err) - }) - - t.Run("SetPeerAddressesDropsSelfRoute", func(t *testing.T) { - t.Parallel() - - ps := newTestPubsub(t, newClusterTestOptions(t)) - require.NoError(t, ps.SetPeerAddresses([]string{clusterRouteAddress(t, ps)})) - require.Empty(t, ps.currentRoutes) - }) } func defaultTestOptions() Options {