diff --git a/enterprise/tailnet/pgcoord.go b/enterprise/tailnet/pgcoord.go index 309a591fa6..4b8268e7a5 100644 --- a/enterprise/tailnet/pgcoord.go +++ b/enterprise/tailnet/pgcoord.go @@ -739,9 +739,12 @@ func (m *mapper) run() { m.logger.Debug(m.ctx, "skipping nil node update") continue } - if err := m.c.Enqueue(update); err != nil { - // lots of reasons this could happen, most usually, the peer has disconnected. - m.logger.Debug(m.ctx, "failed to enqueue node update", slog.Error(err)) + for _, chunk := range update.Chunked() { + if err := m.c.Enqueue(chunk); err != nil { + // lots of reasons this could happen, most usually, the peer has disconnected. + m.logger.Debug(m.ctx, "failed to enqueue chunk", slog.Error(err)) + break + } } } } diff --git a/tailnet/peer.go b/tailnet/peer.go index 34179821a1..1954534a1d 100644 --- a/tailnet/peer.go +++ b/tailnet/peer.go @@ -70,14 +70,16 @@ func (p *peer) batchUpdateMappingLocked(others []*peer, k proto.CoordinateRespon if len(req.PeerUpdates) == 0 { return nil } - select { - case p.resps <- req: - p.lastWrite = time.Now() - p.logger.Debug(context.Background(), "wrote batched update", slog.F("num_peer_updates", len(req.PeerUpdates))) - return nil - default: - return ErrWouldBlock + for _, chunk := range req.Chunked() { + select { + case p.resps <- chunk: + p.lastWrite = time.Now() + default: + return ErrWouldBlock + } } + p.logger.Debug(context.Background(), "wrote batched update", slog.F("num_peer_updates", len(req.PeerUpdates))) + return nil } var errNoResp = xerrors.New("no response needed") diff --git a/tailnet/proto/response.go b/tailnet/proto/response.go new file mode 100644 index 0000000000..ae292b2e61 --- /dev/null +++ b/tailnet/proto/response.go @@ -0,0 +1,22 @@ +package proto + +// maxPeerUpdatesPerMessage is the maximum number of peer updates that +// can be sent in a single CoordinateResponse to stay under DRPC +// message size limits. +const maxPeerUpdatesPerMessage = 1024 + +// Chunked splits the response into multiple responses, each containing +// at most maxPeerUpdatesPerMessage peer updates to stay under the DRPC +// 4 MiB transport limit. +func (r *CoordinateResponse) Chunked() []*CoordinateResponse { + updates := r.GetPeerUpdates() + if len(updates) <= maxPeerUpdatesPerMessage { + return []*CoordinateResponse{r} + } + var chunks []*CoordinateResponse + for i := 0; i < len(updates); i += maxPeerUpdatesPerMessage { + end := min(i+maxPeerUpdatesPerMessage, len(updates)) + chunks = append(chunks, &CoordinateResponse{PeerUpdates: updates[i:end]}) + } + return chunks +} diff --git a/tailnet/proto/response_test.go b/tailnet/proto/response_test.go new file mode 100644 index 0000000000..dcf73b6800 --- /dev/null +++ b/tailnet/proto/response_test.go @@ -0,0 +1,55 @@ +package proto_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/coder/coder/v2/tailnet/proto" +) + +func TestCoordinateResponse_Chunked(t *testing.T) { + t.Parallel() + + t.Run("NoChunkingNeeded", func(t *testing.T) { + t.Parallel() + resp := &proto.CoordinateResponse{ + PeerUpdates: make([]*proto.CoordinateResponse_PeerUpdate, 100), + } + chunks := resp.Chunked() + require.Len(t, chunks, 1) + require.Equal(t, resp, chunks[0]) + }) + + t.Run("ExactLimit", func(t *testing.T) { + t.Parallel() + resp := &proto.CoordinateResponse{ + PeerUpdates: make([]*proto.CoordinateResponse_PeerUpdate, 1024), + } + chunks := resp.Chunked() + require.Len(t, chunks, 1) + require.Equal(t, resp, chunks[0]) + }) + + t.Run("MultipleChunks", func(t *testing.T) { + t.Parallel() + n := 1024*3 + 500 + resp := &proto.CoordinateResponse{ + PeerUpdates: make([]*proto.CoordinateResponse_PeerUpdate, n), + } + chunks := resp.Chunked() + require.Len(t, chunks, 4) + total := 0 + for _, c := range chunks { + total += len(c.GetPeerUpdates()) + } + require.Equal(t, n, total) + }) + + t.Run("EmptyResponse", func(t *testing.T) { + t.Parallel() + resp := &proto.CoordinateResponse{} + chunks := resp.Chunked() + require.Len(t, chunks, 1) + }) +}